You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jjthomas <gi...@git.apache.org> on 2016/06/28 23:51:32 UTC

[GitHub] spark pull request #13957: structured streaming event time window example

GitHub user jjthomas opened a pull request:

    https://github.com/apache/spark/pull/13957

    structured streaming event time window example

    ## What changes were proposed in this pull request?
    
    A structured streaming example with event time windowing.
    
    
    ## How was this patch tested?
    
    Run locally
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jjthomas/spark current

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/13957.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13957
    
----
commit 757245908ded2fbdf08a06c7119311556e634391
Author: James Thomas <ja...@gmail.com>
Date:   2016-06-28T23:49:48Z

    EventTimeWindow example

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61597 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61597/consoleFull)** for PR 13957 at commit [`3e15cc9`](https://github.com/apache/spark/commit/3e15cc925149c6cf6446b4fa24f266c3f3fd9077).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69362821
  
    --- Diff: examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py ---
    @@ -0,0 +1,103 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +"""
    + Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + sliding window of configurable duration. Each line from the network is tagged
    + with a timestamp that is used to determine the windows into which it falls.
    +
    + Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration>
    +   <optional slide duration>
    + <hostname> and <port> describe the TCP server that Structured Streaming
    + would connect to receive data.
    + <window duration> gives the size of window, specified as integer number of seconds
    + <slide duration> gives the amount of time successive windows are offset from one another,
    + given in the same units as above. <slide duration> should be less than or equal to
    + <window duration>. If the two are equal, successive windows have no overlap. If
    + <slide duration> is not provided, it defaults to <window duration>.
    +
    + To run this on your local machine, you need to first run a Netcat server
    +    `$ nc -lk 9999`
    + and then run the example
    +    `$ bin/spark-submit
    +    examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
    +    localhost 9999 <window duration> <optional slide duration>`
    +
    + One recommended <window duration>, <slide duration> pair is 60, 30
    +"""
    +from __future__ import print_function
    +
    +import sys
    +
    +from pyspark.sql import SparkSession
    +from pyspark.sql.functions import explode
    +from pyspark.sql.functions import split
    +from pyspark.sql.functions import window
    +
    +if __name__ == "__main__":
    +    if len(sys.argv) != 5 and len(sys.argv) != 4:
    +        msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> "
    +               "<window duration in seconds> <optional slide duration in seconds>")
    +        print(msg, file=sys.stderr)
    +        exit(-1)
    +
    +    host = sys.argv[1]
    +    port = int(sys.argv[2])
    +    windowSize = int(sys.argv[3])
    +    slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize
    +    if slideSize > windowSize:
    +        print("<slide duration> must be less than or equal to <window duration>", file=sys.stderr)
    +    windowArg = '{} seconds'.format(windowSize)
    +    slideArg = '{} seconds'.format(slideSize)
    +
    +
    +    spark = SparkSession\
    +        .builder\
    +        .appName("StructuredNetworkWordCountWindowed")\
    +        .getOrCreate()
    +
    +    # Create DataFrame representing the stream of input lines from connection to host:port
    +    lines = spark\
    +        .readStream\
    +        .format('socket')\
    +        .option('host', host)\
    +        .option('port', port)\
    +        .option('includeTimestamp', 'true')\
    +        .load()
    +
    +    # Split the lines into words, retaining timestamps
    +    words = lines.select(
    +        # explode turns each item in an array into a separate row
    --- End diff --
    
    move this above with the other comment. and bit more explanation. 
    e.g.
    "split() splits each line into an array, and explode() turns the array into multiple rows"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/61789/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: structured streaming event time window example

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/13957


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69352894
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala ---
    @@ -0,0 +1,100 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +// scalastyle:off println
    +package org.apache.spark.examples.sql.streaming
    +
    +import java.sql.Timestamp
    +
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.functions._
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> <slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap.
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <slide duration>`
    + *
    + * One recommended <window duration>, <slide duration> pair is 60, 30
    + */
    +object StructuredNetworkWordCountWindowed {
    +
    +  def main(args: Array[String]) {
    +    if (args.length < 4) {
    +      System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
    +        " <window duration in seconds> <slide duration in seconds>")
    +      System.exit(1)
    +    }
    +
    +    val host = args(0)
    +    val port = args(1).toInt
    +    val windowSize = args(2).toInt
    +    val slideSize = args(3).toInt
    +    if (slideSize > windowSize) {
    +      System.err.println("<slide duration> must be less than or equal to <window duration>")
    +    }
    +
    +    val spark = SparkSession
    +      .builder
    +      .appName("StructuredNetworkWordCountWindowed")
    +      .getOrCreate()
    +
    +    import spark.implicits._
    +
    +    // Create DataFrame representing the stream of input lines from connection to host:port
    +    val lines = spark.readStream
    +      .format("socket")
    +      .option("host", host)
    +      .option("port", port)
    +      .option("includeTimestamp", true)
    +      .load().as[(String, Timestamp)]
    +
    +    // Split the lines into words, retaining timestamps
    +    val words = lines.flatMap(line =>
    +      line._1.split(" ").map(word => (word, line._2))
    +    ).toDF("word", "timestamp")
    +
    +    // Group the data by window and word and compute the count of each group
    +    val windowedCounts = words.groupBy(
    +      window(words.col("timestamp"), s"$windowSize seconds", s"$slideSize seconds"),
    --- End diff --
    
    try the $ notation. update other example if it works. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61638 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61638/consoleFull)** for PR 13957 at commit [`8bb543d`](https://github.com/apache/spark/commit/8bb543dee4afda642f4dac48af3306ae293dd4d7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69362577
  
    --- Diff: examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.examples.sql.streaming;
    +
    +import org.apache.spark.api.java.function.FlatMapFunction;
    +import org.apache.spark.sql.*;
    +import org.apache.spark.sql.functions;
    +import org.apache.spark.sql.streaming.StreamingQuery;
    +import scala.Tuple2;
    +
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port> <window duration>
    + *   <optional slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap. If
    + * <slide duration> is not provided, it defaults to <window duration>.
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <optional slide duration>`
    + *
    + * One recommended <window duration>, <slide duration> pair is 60, 30
    --- End diff --
    
    nit: make it 10 and 5, so that user does not have to wait for 60 seconds to see multiple windows.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69262396
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +// scalastyle:off println
    +package org.apache.spark.examples.sql.streaming
    +
    +import java.sql.Timestamp
    +
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.functions._
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> <slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds, minutes,
    + * or days, e.g. "1 minute", "2 seconds"
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap.
    + * (<window duration> and <slide duration> must be enclosed by quotes to ensure that
    + * they are processed as individual arguments)
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <slide duration>`
    + *
    + * One recommended <window duration>, <slide duration> pair is "1 minute",
    + * "30 seconds"
    + */
    +object StructuredNetworkWordCountWindowed {
    +
    +  def main(args: Array[String]) {
    +    if (args.length < 4) {
    +      System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
    +        " <window duration> <slide duration>")
    +      System.exit(1)
    +    }
    +
    +    val host = args(0)
    +    val port = args(1).toInt
    +    val windowSize = args(2)
    +    val slideSize = args(3)
    +
    +    val spark = SparkSession
    +      .builder
    +      .appName("StructuredNetworkWordCountWindowed")
    +      .getOrCreate()
    +
    +    import spark.implicits._
    +
    +    // Create DataFrame representing the stream of input lines from connection to host:port
    +    val lines = spark.readStream
    +      .format("socket")
    +      .option("host", host)
    +      .option("port", port)
    +      .option("includeTimestamp", true)
    +      .load().as[(String, Timestamp)]
    +
    +    // Split the lines into words, retaining timestamps
    +    val words = lines.flatMap(line =>
    +      line._1.split(" ")
    --- End diff --
    
    put these two in the same line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69353077
  
    --- Diff: examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py ---
    @@ -0,0 +1,102 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +"""
    + Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + sliding window of configurable duration. Each line from the network is tagged
    + with a timestamp that is used to determine the windows into which it falls.
    +
    + Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration>
    +   <slide duration>
    + <hostname> and <port> describe the TCP server that Structured Streaming
    + would connect to receive data.
    + <window duration> gives the size of window, specified as integer number of seconds
    + <slide duration> gives the amount of time successive windows are offset from one another,
    + given in the same units as above. <slide duration> should be less than or equal to
    + <window duration>. If the two are equal, successive windows have no overlap.
    +
    + To run this on your local machine, you need to first run a Netcat server
    +    `$ nc -lk 9999`
    + and then run the example
    +    `$ bin/spark-submit
    +    examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
    +    localhost 9999 <window duration> <slide duration>`
    +
    + One recommended <window duration>, <slide duration> pair is 60, 30
    +"""
    +from __future__ import print_function
    +
    +import sys
    +
    +from pyspark.sql import SparkSession
    +from pyspark.sql.functions import explode
    +from pyspark.sql.functions import split
    +from pyspark.sql.functions import window
    +
    +if __name__ == "__main__":
    +    if len(sys.argv) != 5:
    +        msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> "
    +               "<window duration in seconds> <slide duration in seconds>")
    +        print(msg, file=sys.stderr)
    +        exit(-1)
    +
    +    host = sys.argv[1]
    +    port = int(sys.argv[2])
    +    windowSize = int(sys.argv[3])
    +    slideSize = int(sys.argv[4])
    +    if slideSize > windowSize:
    +        print("<slide duration> must be less than or equal to <window duration>", file=sys.stderr)
    +
    +    spark = SparkSession\
    +        .builder\
    +        .appName("StructuredNetworkWordCountWindowed")\
    +        .getOrCreate()
    +
    +    # Create DataFrame representing the stream of input lines from connection to host:port
    +    lines = spark\
    +        .readStream\
    +        .format('socket')\
    +        .option('host', host)\
    +        .option('port', port)\
    +        .option('includeTimestamp', 'true')\
    +        .load()
    +
    +    # Split the lines into words, retaining timestamps
    +    words = lines.select(
    +        # explode turns each item in an array into a separate row
    +        explode(
    +            split(lines.value, ' ')
    +        ).alias('word'),
    +        lines.timestamp
    +    )
    +
    +    # Group the data by window and word and compute the count of each group
    +    windowedCounts = words.groupBy(
    +        window(words.timestamp, '{} seconds'.format(windowSize),
    --- End diff --
    
    move the string generation earlier, around line 58. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69352584
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -136,7 +149,8 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
           parameters: Map[String, String]): Source = {
         val host = parameters("host")
         val port = parameters("port").toInt
    -    new TextSocketSource(host, port, sqlContext)
    +    new TextSocketSource(host, port,
    +      parameters.getOrElse("includeTimestamp", "false").toBoolean, sqlContext)
    --- End diff --
    
    will throw error if includeTimestamp is not parseable as boolean. 
    also why is this condition check in two places? there should be one place checking the option and handling parsing errors and all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61790 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61790/consoleFull)** for PR 13957 at commit [`8f97b66`](https://github.com/apache/spark/commit/8f97b66f55ececd91434a3f9a3b28dd6c5412d46).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69362419
  
    --- Diff: examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.examples.sql.streaming;
    +
    +import org.apache.spark.api.java.function.FlatMapFunction;
    +import org.apache.spark.sql.*;
    +import org.apache.spark.sql.functions;
    +import org.apache.spark.sql.streaming.StreamingQuery;
    +import scala.Tuple2;
    +
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port> <window duration>
    + *   <optional slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap. If
    + * <slide duration> is not provided, it defaults to <window duration>.
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <optional slide duration>`
    + *
    + * One recommended <window duration>, <slide duration> pair is 60, 30
    + */
    +public final class JavaStructuredNetworkWordCountWindowed {
    +
    +  public static void main(String[] args) throws Exception {
    +    if (args.length < 3) {
    +      System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" +
    +        " <window duration in seconds> <optional slide duration in seconds>");
    --- End diff --
    
    optional stuff is usually written as  `[ ... ]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61635 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61635/consoleFull)** for PR 13957 at commit [`57a8b11`](https://github.com/apache/spark/commit/57a8b11c5949271c50cd338aab10a4322600864e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61639 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61639/consoleFull)** for PR 13957 at commit [`e7a81e1`](https://github.com/apache/spark/commit/e7a81e1a7455047d8f53d183f8f320dd77871882).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69353021
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala ---
    @@ -0,0 +1,100 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +// scalastyle:off println
    +package org.apache.spark.examples.sql.streaming
    +
    +import java.sql.Timestamp
    +
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.functions._
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> <slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap.
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <slide duration>`
    + *
    + * One recommended <window duration>, <slide duration> pair is 60, 30
    + */
    +object StructuredNetworkWordCountWindowed {
    +
    +  def main(args: Array[String]) {
    +    if (args.length < 4) {
    +      System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
    +        " <window duration in seconds> <slide duration in seconds>")
    +      System.exit(1)
    +    }
    +
    +    val host = args(0)
    +    val port = args(1).toInt
    +    val windowSize = args(2).toInt
    +    val slideSize = args(3).toInt
    +    if (slideSize > windowSize) {
    +      System.err.println("<slide duration> must be less than or equal to <window duration>")
    +    }
    +
    +    val spark = SparkSession
    +      .builder
    +      .appName("StructuredNetworkWordCountWindowed")
    +      .getOrCreate()
    +
    +    import spark.implicits._
    +
    +    // Create DataFrame representing the stream of input lines from connection to host:port
    +    val lines = spark.readStream
    +      .format("socket")
    +      .option("host", host)
    +      .option("port", port)
    +      .option("includeTimestamp", true)
    +      .load().as[(String, Timestamp)]
    +
    +    // Split the lines into words, retaining timestamps
    +    val words = lines.flatMap(line =>
    +      line._1.split(" ").map(word => (word, line._2))
    +    ).toDF("word", "timestamp")
    +
    +    // Group the data by window and word and compute the count of each group
    +    val windowedCounts = words.groupBy(
    +      window(words.col("timestamp"), s"$windowSize seconds", s"$slideSize seconds"),
    --- End diff --
    
    Also move the "$windowSize seconds" higher up .. similar to the python, so that this piece of code looks simpler, and can be exactly copied over to the guide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61597 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61597/consoleFull)** for PR 13957 at commit [`3e15cc9`](https://github.com/apache/spark/commit/3e15cc925149c6cf6446b4fa24f266c3f3fd9077).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69352799
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -47,7 +52,7 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
       private var readThread: Thread = null
     
       @GuardedBy("this")
    -  private var lines = new ArrayBuffer[String]
    +  private var lines = new ArrayBuffer[(String, String)]
    --- End diff --
    
    why cant you store it in the java.sql.time form?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69254704
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +// scalastyle:off println
    +package org.apache.spark.examples.sql.streaming
    +
    +import java.sql.Timestamp
    +
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.functions._
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> <slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds, minutes,
    + * or days, e.g. "1 minute", "2 seconds"
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap.
    + * (<window duration> and <slide duration> must be enclosed by quotes to ensure that
    + * they are processed as individual arguments)
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <slide duration>`
    + *
    + * One recommended <window duration>, <slide duration> pair is "1 minute",
    + * "30 seconds"
    + */
    +object StructuredNetworkWordCountWindowed {
    +
    +  def main(args: Array[String]) {
    +    if (args.length < 4) {
    +      System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
    +        " <window duration> <slide duration>")
    --- End diff --
    
    not clear what is the format of specifying the durations. Either add examples in the usage, or make sure they are in seconds and the user has to specify a single number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    this is looking good. a few cosmetic changes. can you update the windows operation section in the programming guide to use this code? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69363332
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -92,7 +102,11 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
         val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
         val data = synchronized { lines.slice(startIdx, endIdx) }
         import sqlContext.implicits._
    -    data.toDF("value")
    +    if (includeTimestamp) {
    +      data.toDF("value", "timestamp")
    +    } else {
    +      data.map(_._1).toDF("value")
    --- End diff --
    
    nit: `data.select("value")` does not work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/61790/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69363852
  
    --- Diff: examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py ---
    @@ -0,0 +1,103 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +"""
    + Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + sliding window of configurable duration. Each line from the network is tagged
    + with a timestamp that is used to determine the windows into which it falls.
    +
    + Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration>
    +   <optional slide duration>
    + <hostname> and <port> describe the TCP server that Structured Streaming
    + would connect to receive data.
    + <window duration> gives the size of window, specified as integer number of seconds
    + <slide duration> gives the amount of time successive windows are offset from one another,
    + given in the same units as above. <slide duration> should be less than or equal to
    + <window duration>. If the two are equal, successive windows have no overlap. If
    + <slide duration> is not provided, it defaults to <window duration>.
    +
    + To run this on your local machine, you need to first run a Netcat server
    +    `$ nc -lk 9999`
    + and then run the example
    +    `$ bin/spark-submit
    +    examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
    +    localhost 9999 <window duration> <optional slide duration>`
    +
    + One recommended <window duration>, <slide duration> pair is 60, 30
    +"""
    +from __future__ import print_function
    +
    +import sys
    +
    +from pyspark.sql import SparkSession
    +from pyspark.sql.functions import explode
    +from pyspark.sql.functions import split
    +from pyspark.sql.functions import window
    +
    +if __name__ == "__main__":
    +    if len(sys.argv) != 5 and len(sys.argv) != 4:
    +        msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> "
    +               "<window duration in seconds> <optional slide duration in seconds>")
    --- End diff --
    
    nit: optional args are usually written in square brackets, eg `[<param>]` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69263155
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +// scalastyle:off println
    +package org.apache.spark.examples.sql.streaming
    +
    +import java.sql.Timestamp
    +
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.functions._
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> <slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds, minutes,
    + * or days, e.g. "1 minute", "2 seconds"
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap.
    + * (<window duration> and <slide duration> must be enclosed by quotes to ensure that
    + * they are processed as individual arguments)
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <slide duration>`
    + *
    + * One recommended <window duration>, <slide duration> pair is "1 minute",
    + * "30 seconds"
    + */
    +object StructuredNetworkWordCountWindowed {
    +
    +  def main(args: Array[String]) {
    +    if (args.length < 4) {
    +      System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
    +        " <window duration> <slide duration>")
    --- End diff --
    
    on second thought might be simpler to just use seconds. less prone to users entering incorrect format.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by jjthomas <gi...@git.apache.org>.
Github user jjthomas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69358134
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -136,7 +149,8 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
           parameters: Map[String, String]): Source = {
         val host = parameters("host")
         val port = parameters("port").toInt
    -    new TextSocketSource(host, port, sqlContext)
    +    new TextSocketSource(host, port,
    +      parameters.getOrElse("includeTimestamp", "false").toBoolean, sqlContext)
    --- End diff --
    
    sourceSchema and createSource are separate functions that are passed the parameters map, so the conversion to Boolean needs to happen in both places. Think it's fine if this throws an error if it's not parseable ... the line above will also throw an error if "port" is not parseable as Int.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61789 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61789/consoleFull)** for PR 13957 at commit [`893f70e`](https://github.com/apache/spark/commit/893f70ec45d936929f89e08127821ea9396a9d35).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69353150
  
    --- Diff: examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py ---
    @@ -0,0 +1,102 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +"""
    + Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + sliding window of configurable duration. Each line from the network is tagged
    + with a timestamp that is used to determine the windows into which it falls.
    +
    + Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration>
    +   <slide duration>
    + <hostname> and <port> describe the TCP server that Structured Streaming
    + would connect to receive data.
    + <window duration> gives the size of window, specified as integer number of seconds
    + <slide duration> gives the amount of time successive windows are offset from one another,
    + given in the same units as above. <slide duration> should be less than or equal to
    + <window duration>. If the two are equal, successive windows have no overlap.
    +
    + To run this on your local machine, you need to first run a Netcat server
    +    `$ nc -lk 9999`
    + and then run the example
    +    `$ bin/spark-submit
    +    examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
    +    localhost 9999 <window duration> <slide duration>`
    +
    + One recommended <window duration>, <slide duration> pair is 60, 30
    +"""
    +from __future__ import print_function
    +
    +import sys
    +
    +from pyspark.sql import SparkSession
    +from pyspark.sql.functions import explode
    +from pyspark.sql.functions import split
    +from pyspark.sql.functions import window
    +
    +if __name__ == "__main__":
    +    if len(sys.argv) != 5:
    +        msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> "
    +               "<window duration in seconds> <slide duration in seconds>")
    --- End diff --
    
    can you make the slide duration optional in all of these examples?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/61597/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69351824
  
    --- Diff: examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py ---
    @@ -0,0 +1,102 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +"""
    + Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + sliding window of configurable duration. Each line from the network is tagged
    + with a timestamp that is used to determine the windows into which it falls.
    +
    + Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration>
    +   <slide duration>
    + <hostname> and <port> describe the TCP server that Structured Streaming
    + would connect to receive data.
    + <window duration> gives the size of window, specified as integer number of seconds
    + <slide duration> gives the amount of time successive windows are offset from one another,
    + given in the same units as above. <slide duration> should be less than or equal to
    + <window duration>. If the two are equal, successive windows have no overlap.
    +
    + To run this on your local machine, you need to first run a Netcat server
    +    `$ nc -lk 9999`
    + and then run the example
    +    `$ bin/spark-submit
    +    examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
    +    localhost 9999 <window duration> <slide duration>`
    +
    + One recommended <window duration>, <slide duration> pair is 60, 30
    +"""
    +from __future__ import print_function
    +
    +import sys
    +
    +from pyspark.sql import SparkSession
    +from pyspark.sql.functions import explode
    +from pyspark.sql.functions import split
    +from pyspark.sql.functions import window
    +
    +if __name__ == "__main__":
    +    if len(sys.argv) != 5:
    +        msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> "
    +               "<window duration in seconds> <slide duration in seconds>")
    +        print(msg, file=sys.stderr)
    +        exit(-1)
    +
    +    host = sys.argv[1]
    +    port = int(sys.argv[2])
    +    windowSize = int(sys.argv[3])
    +    slideSize = int(sys.argv[4])
    +    if slideSize > windowSize:
    +        print("<slide duration> must be less than or equal to <window duration>", file=sys.stderr)
    +
    +    spark = SparkSession\
    +        .builder\
    +        .appName("StructuredNetworkWordCountWindowed")\
    +        .getOrCreate()
    +
    +    # Create DataFrame representing the stream of input lines from connection to host:port
    +    lines = spark\
    +        .readStream\
    +        .format('socket')\
    +        .option('host', host)\
    +        .option('port', port)\
    +        .option('includeTimestamp', 'true')\
    +        .load()
    +
    +    # Split the lines into words, retaining timestamps
    +    words = lines.select(
    +        # explode turns each item in an array into a separate row
    +        explode(
    +            split(lines.value, ' ')
    --- End diff --
    
    looks too nested, put explode and split in the same line. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61635 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61635/consoleFull)** for PR 13957 at commit [`57a8b11`](https://github.com/apache/spark/commit/57a8b11c5949271c50cd338aab10a4322600864e).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public final class JavaStructuredNetworkWordCountWindowed `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69363665
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -136,7 +149,8 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
           parameters: Map[String, String]): Source = {
         val host = parameters("host")
         val port = parameters("port").toInt
    -    new TextSocketSource(host, port, sqlContext)
    +    new TextSocketSource(host, port,
    +      parameters.getOrElse("includeTimestamp", "false").toBoolean, sqlContext)
    --- End diff --
    
    but its better to have single function that catches that error and prints a better error message like "value of option includeTimestamp cannot be parsed. allowed values are "true" or "false".
    also it needs to be a IllegalArgumentException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69352313
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -125,7 +137,8 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
         if (!parameters.contains("port")) {
           throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
         }
    -    ("textSocket", TextSocketSource.SCHEMA)
    +    ("textSocket", if (parameters.getOrElse("includeTimestamp", "false") == "true")
    +      { TextSocketSource.SCHEMA_TIMESTAMP } else { TextSocketSource.SCHEMA_REGULAR })
    --- End diff --
    
    this is hard to read. break into two lines 
    ```
    val schema =  ...
    ("textSocket", schema"
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61789 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61789/consoleFull)** for PR 13957 at commit [`893f70e`](https://github.com/apache/spark/commit/893f70ec45d936929f89e08127821ea9396a9d35).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61638 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61638/consoleFull)** for PR 13957 at commit [`8bb543d`](https://github.com/apache/spark/commit/8bb543dee4afda642f4dac48af3306ae293dd4d7).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/61638/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69362732
  
    --- Diff: examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.examples.sql.streaming;
    +
    +import org.apache.spark.api.java.function.FlatMapFunction;
    +import org.apache.spark.sql.*;
    +import org.apache.spark.sql.functions;
    +import org.apache.spark.sql.streaming.StreamingQuery;
    +import scala.Tuple2;
    +
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port> <window duration>
    + *   <optional slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap. If
    + * <slide duration> is not provided, it defaults to <window duration>.
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <optional slide duration>`
    + *
    + * One recommended <window duration>, <slide duration> pair is 60, 30
    + */
    +public final class JavaStructuredNetworkWordCountWindowed {
    +
    +  public static void main(String[] args) throws Exception {
    +    if (args.length < 3) {
    +      System.err.println("Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port>" +
    +        " <window duration in seconds> <optional slide duration in seconds>");
    +      System.exit(1);
    +    }
    +
    +    String host = args[0];
    +    int port = Integer.parseInt(args[1]);
    +    int windowSize = Integer.parseInt(args[2]);
    +    int slideSize = (args.length == 3) ? windowSize : Integer.parseInt(args[3]);
    +    if (slideSize > windowSize) {
    +      System.err.println("<slide duration> must be less than or equal to <window duration>");
    +    }
    +    String windowArg = windowSize + " seconds";
    +    String slideArg = slideSize + " seconds";
    +
    +    SparkSession spark = SparkSession
    +      .builder()
    +      .appName("JavaStructuredNetworkWordCountWindowed")
    +      .getOrCreate();
    +
    +    // Create DataFrame representing the stream of input lines from connection to host:port
    +    Dataset<Tuple2<String, Timestamp>> lines = spark
    +      .readStream()
    +      .format("socket")
    +      .option("host", host)
    +      .option("port", port)
    +      .option("includeTimestamp", true)
    +      .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
    +
    +    // Split the lines into words, retaining timestamps
    +    Dataset<Row> words = lines.flatMap(
    +      new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
    +        @Override
    +        public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
    +          List<Tuple2<String, Timestamp>> result = new ArrayList<>();
    +          for (String word : t._1.split(" ")) {
    +            result.add(new Tuple2<>(word, t._2));
    +          }
    +          return result.iterator();
    +        }
    +      },
    +      Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
    +    ).toDF("word", "timestamp");
    +
    +    // Group the data by window and word and compute the count of each group
    +    Dataset<Row> windowedCounts = words.groupBy(
    +      functions.window(words.col("timestamp"), windowArg, slideArg),
    --- End diff --
    
    windowArg --> windowDuration
    slideArg --> slideInterval
    
    So that this code snippet in the guide is self explanatory.
    And make sure its consistent in other languages.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69352364
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -125,7 +137,8 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
         if (!parameters.contains("port")) {
           throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
         }
    -    ("textSocket", TextSocketSource.SCHEMA)
    +    ("textSocket", if (parameters.getOrElse("includeTimestamp", "false") == "true")
    --- End diff --
    
    not checking for case sensitivity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69362387
  
    --- Diff: examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.examples.sql.streaming;
    +
    +import org.apache.spark.api.java.function.FlatMapFunction;
    +import org.apache.spark.sql.*;
    +import org.apache.spark.sql.functions;
    +import org.apache.spark.sql.streaming.StreamingQuery;
    +import scala.Tuple2;
    +
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: JavaStructuredNetworkWordCountWindowed <hostname> <port> <window duration>
    + *   <optional slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap. If
    + * <slide duration> is not provided, it defaults to <window duration>.
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <optional slide duration>`
    --- End diff --
    
    window duration in seconds...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69052427
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -67,7 +71,9 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
                   return
                 }
                 TextSocketSource.this.synchronized {
    -              lines += line
    +              lines += ((line,
    +                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    --- End diff --
    
    no need to create a format object every time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61790 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61790/consoleFull)** for PR 13957 at commit [`8f97b66`](https://github.com/apache/spark/commit/8f97b66f55ececd91434a3f9a3b28dd6c5412d46).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69262951
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +// scalastyle:off println
    +package org.apache.spark.examples.sql.streaming
    +
    +import java.sql.Timestamp
    +
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.functions._
    +
    +/**
    + * Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + * sliding window of configurable duration. Each line from the network is tagged
    + * with a timestamp that is used to determine the windows into which it falls.
    + *
    + * Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration> <slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming
    + * would connect to receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds, minutes,
    + * or days, e.g. "1 minute", "2 seconds"
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above. <slide duration> should be less than or equal to
    + * <window duration>. If the two are equal, successive windows have no overlap.
    + * (<window duration> and <slide duration> must be enclosed by quotes to ensure that
    + * they are processed as individual arguments)
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
    + *    localhost 9999 <window duration> <slide duration>`
    + *
    + * One recommended <window duration>, <slide duration> pair is "1 minute",
    + * "30 seconds"
    + */
    +object StructuredNetworkWordCountWindowed {
    +
    +  def main(args: Array[String]) {
    +    if (args.length < 4) {
    +      System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
    +        " <window duration> <slide duration>")
    +      System.exit(1)
    +    }
    +
    +    val host = args(0)
    +    val port = args(1).toInt
    +    val windowSize = args(2)
    +    val slideSize = args(3)
    +
    +    val spark = SparkSession
    +      .builder
    +      .appName("StructuredNetworkWordCountWindowed")
    +      .getOrCreate()
    +
    +    import spark.implicits._
    +
    +    // Create DataFrame representing the stream of input lines from connection to host:port
    +    val lines = spark.readStream
    +      .format("socket")
    +      .option("host", host)
    +      .option("port", port)
    +      .option("includeTimestamp", true)
    +      .load().as[(String, Timestamp)]
    +
    +    // Split the lines into words, retaining timestamps
    +    val words = lines.flatMap(line =>
    +      line._1.split(" ")
    +        .map(word => (word, line._2))
    +    )
    --- End diff --
    
    you could convert this to a DF with nice column name with `.toDF("word",  "time")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    this looks good for scala, please add the java and python.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69248961
  
    --- Diff: examples/src/main/python/sql/streaming/structured_network_wordcount.py ---
    @@ -16,7 +16,7 @@
     #
     
     """
    - Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
    + Counts words in UTF8 encoded, '\n' delimited text received from the network.
    --- End diff --
    
    good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/61635/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    **[Test build #61639 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61639/consoleFull)** for PR 13957 at commit [`e7a81e1`](https://github.com/apache/spark/commit/e7a81e1a7455047d8f53d183f8f320dd77871882).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: structured streaming event time window example

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r68907588
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/streaming/EventTimeWindow.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +// scalastyle:off println
    +package org.apache.spark.examples.sql.streaming
    +
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.types.{DoubleType, TimestampType}
    +
    +/**
    + * Computes the average signal from IoT device readings over a sliding window of
    + * configurable duration. The readings are received over the network and must be
    + * UTF8-encoded and separated by '\n'.
    + *
    + * A single reading should take the format
    + * <device name (string)>, <reading (double)>, <time (timestamp)>
    + *
    + * Usage: EventTimeWindow <hostname> <port> <window duration>
    + *   <slide duration> <checkpoint dir>
    + * <hostname> and <port> describe the TCP server that Structured Streaming would connect to
    + * receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds, minutes,
    + * or days, e.g. "1 minute", "2 seconds"
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above
    + * (<window duration> and <slide duration> must be enclosed by quotes to ensure that
    + * they are processed as individual arguments)
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.EventTimeWindow
    + *    localhost 9999 <window duration> <slide duration> <checkpoint dir>`
    + *
    + * Type device readings in the format given above into Netcat.
    + *
    + * An example sequence of device readings:
    + * dev0,7.0,2015-03-18T12:00:00
    + * dev1,8.0,2015-03-18T12:00:10
    + * dev0,5.0,2015-03-18T12:00:20
    + * dev1,3.0,2015-03-18T12:00:30
    --- End diff --
    
    i dont like it that the user has to enter data in a particular format. that make this clumsy to use. 
    
    how about this. lets add an option in the socket source, that attaches the receiving time as a column in the data. so the returned schema would be (time, line) if that option is specified. then we can very easily write a simple sliding window word count example using that.
    what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69363771
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala ---
    @@ -85,6 +86,47 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
         }
       }
     
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val provider = new TextSocketSourceProvider
    +    val parameters = Map("host" -> "localhost", "port" -> serverThread.port.toString,
    +      "includeTimestamp" -> "true")
    +    val schema = provider.sourceSchema(sqlContext, None, "", parameters)._2
    +    assert(schema === StructType(StructField("value", StringType) ::
    +      StructField("timestamp", TimestampType) :: Nil))
    +
    +    source = provider.createSource(sqlContext, "", None, "", parameters)
    +
    +    failAfter(streamingTimeout) {
    +      serverThread.enqueue("hello")
    +      while (source.getOffset.isEmpty) {
    +        Thread.sleep(10)
    +      }
    +      val offset1 = source.getOffset.get
    +      val batch1 = source.getBatch(None, offset1)
    +      val batch1Seq = batch1.as[(String, Timestamp)].collect().toSeq
    +      assert(batch1Seq.map(_._1) === Seq("hello"))
    +      val batch1Stamp = batch1Seq(0)._2
    +
    +      serverThread.enqueue("world")
    +      while (source.getOffset.get === offset1) {
    +        Thread.sleep(10)
    +      }
    +      val offset2 = source.getOffset.get
    +      val batch2 = source.getBatch(Some(offset1), offset2)
    +      val batch2Seq = batch2.as[(String, Timestamp)].collect().toSeq
    +      assert(batch2Seq.map(_._1) === Seq("world"))
    +      val batch2Stamp = batch2Seq(0)._2
    +      assert(!batch2Stamp.before(batch1Stamp))
    +
    +      // Try stopping the source to make sure this does not block forever.
    +      source.stop()
    +      source = null
    +    }
    +  }
    +
    --- End diff --
    
    add tests below to make sure includeTimestamp errors are checked. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/61639/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69053315
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/streaming/EventTimeWindow.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +// scalastyle:off println
    +package org.apache.spark.examples.sql.streaming
    +
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.types.{DoubleType, TimestampType}
    +
    +/**
    + * Computes the average signal from IoT device readings over a sliding window of
    + * configurable duration. The readings are received over the network and must be
    + * UTF8-encoded and separated by '\n'.
    + *
    + * A single reading should take the format
    + * <device name (string)>, <reading (double)>
    + *
    + * Usage: EventTimeWindow <hostname> <port> <window duration>
    + *   <slide duration>
    + * <hostname> and <port> describe the TCP server that Structured Streaming would connect to
    + * receive data.
    + * <window duration> gives the size of window, specified as integer number of seconds, minutes,
    + * or days, e.g. "1 minute", "2 seconds"
    + * <slide duration> gives the amount of time successive windows are offset from one another,
    + * given in the same units as above
    + * (<window duration> and <slide duration> must be enclosed by quotes to ensure that
    + * they are processed as individual arguments)
    + *
    + * To run this on your local machine, you need to first run a Netcat server
    + *    `$ nc -lk 9999`
    + * and then run the example
    + *    `$ bin/run-example sql.streaming.EventTimeWindow
    + *    localhost 9999 <window duration> <slide duration>`
    + *
    + * Type device readings in the format given above into Netcat.
    + *
    + * An example sequence of device readings:
    + * dev0,7.0
    + * dev1,8.0
    + * dev0,5.0
    + * dev1,3.0
    + */
    +object EventTimeWindow {
    +
    +  def main(args: Array[String]) {
    +    if (args.length < 4) {
    +      System.err.println("Usage: EventTimeWindow <hostname> <port> <window duration>" +
    +        " <slide duration>")
    +      System.exit(1)
    +    }
    +
    +    val host = args(0)
    +    val port = args(1).toInt
    +    val windowSize = args(2)
    +    val slideSize = args(3)
    +
    +    val spark = SparkSession
    +      .builder
    +      .appName("EventTimeWindow")
    +      .getOrCreate()
    +
    +    // Create DataFrame representing the stream of input readings from connection to host:port
    +    val lines = spark.readStream
    +      .format("socket")
    +      .option("host", host)
    +      .option("port", port)
    +      .option("includeTimestamp", true)
    +      .load()
    +
    +    // Split the readings into their individual components
    +    val splitLines = lines.select(
    +      split(lines.col("value"), ",").alias("pieces"),
    --- End diff --
    
    as i said offline this is very complex example. rather just do streaming windowed word count. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: [SPARK-16114] [SQL] structured streaming event time wind...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    LGTM. Merging this to master and 2.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69363245
  
    --- Diff: examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py ---
    @@ -0,0 +1,103 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +"""
    + Counts words in UTF8 encoded, '\n' delimited text received from the network over a
    + sliding window of configurable duration. Each line from the network is tagged
    + with a timestamp that is used to determine the windows into which it falls.
    +
    + Usage: structured_network_wordcount_windowed.py <hostname> <port> <window duration>
    +   <optional slide duration>
    + <hostname> and <port> describe the TCP server that Structured Streaming
    + would connect to receive data.
    + <window duration> gives the size of window, specified as integer number of seconds
    + <slide duration> gives the amount of time successive windows are offset from one another,
    + given in the same units as above. <slide duration> should be less than or equal to
    + <window duration>. If the two are equal, successive windows have no overlap. If
    + <slide duration> is not provided, it defaults to <window duration>.
    +
    + To run this on your local machine, you need to first run a Netcat server
    +    `$ nc -lk 9999`
    + and then run the example
    +    `$ bin/spark-submit
    +    examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
    +    localhost 9999 <window duration> <optional slide duration>`
    +
    + One recommended <window duration>, <slide duration> pair is 60, 30
    +"""
    +from __future__ import print_function
    +
    +import sys
    +
    +from pyspark.sql import SparkSession
    +from pyspark.sql.functions import explode
    +from pyspark.sql.functions import split
    +from pyspark.sql.functions import window
    +
    +if __name__ == "__main__":
    +    if len(sys.argv) != 5 and len(sys.argv) != 4:
    +        msg = ("Usage: structured_network_wordcount_windowed.py <hostname> <port> "
    +               "<window duration in seconds> <optional slide duration in seconds>")
    +        print(msg, file=sys.stderr)
    +        exit(-1)
    +
    +    host = sys.argv[1]
    +    port = int(sys.argv[2])
    +    windowSize = int(sys.argv[3])
    +    slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize
    +    if slideSize > windowSize:
    +        print("<slide duration> must be less than or equal to <window duration>", file=sys.stderr)
    +    windowArg = '{} seconds'.format(windowSize)
    +    slideArg = '{} seconds'.format(slideSize)
    +
    +
    +    spark = SparkSession\
    +        .builder\
    +        .appName("StructuredNetworkWordCountWindowed")\
    +        .getOrCreate()
    +
    +    # Create DataFrame representing the stream of input lines from connection to host:port
    +    lines = spark\
    +        .readStream\
    +        .format('socket')\
    +        .option('host', host)\
    +        .option('port', port)\
    +        .option('includeTimestamp', 'true')\
    +        .load()
    +
    +    # Split the lines into words, retaining timestamps
    +    words = lines.select(
    +        # explode turns each item in an array into a separate row
    +        explode(split(lines.value, ' ')).alias('word'),
    +        lines.timestamp
    +    )
    +
    +    # Group the data by window and word and compute the count of each group
    +    windowedCounts = words.groupBy(
    +        window(words.timestamp, windowArg, slideArg),
    --- End diff --
    
    rename here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13957: structured streaming event time window example

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/13957
  
    use the same JIRA number as the previous one.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13957#discussion_r69052686
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -92,7 +99,12 @@ class TextSocketSource(host: String, port: Int, sqlContext: SQLContext)
         val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
         val data = synchronized { lines.slice(startIdx, endIdx) }
         import sqlContext.implicits._
    -    data.toDF("value")
    --- End diff --
    
    Add unit tests to test these cases in TextSocketStreamSuite


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org