You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jerryshao <gi...@git.apache.org> on 2018/01/24 09:07:29 UTC

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

GitHub user jerryshao opened a pull request:

    https://github.com/apache/spark/pull/20382

    [SPARK-23097][SQL][SS] Migrate text socket source to V2

    ## What changes were proposed in this pull request?
    
    This PR moves structured streaming text socket source to V2.
    
    Questions: do we need to remove old "socket" source?
    
    ## How was this patch tested?
    
    Unit test and manual verification.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jerryshao/apache-spark SPARK-23097

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20382.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20382
    
----
commit 629243d94835e8eef379122d0c40d5c0b20ea351
Author: jerryshao <ss...@...>
Date:   2018-01-24T08:22:07Z

    Move text socket source to V2

commit 8f3b54824d92123a0e7d468d42db30dba72cded1
Author: jerryshao <ss...@...>
Date:   2018-01-24T09:04:44Z

    Rename to V2

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/699/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87825/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164944650
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -47,130 +48,141 @@ object TextSocketSource {
      * This source will *not* work in production applications due to multiple reasons, including no
      * support for fault recovery and keeping all of the text read in memory forever.
      */
    -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)
    -  extends Source with Logging {
    -
    -  @GuardedBy("this")
    -  private var socket: Socket = null
    -
    -  @GuardedBy("this")
    -  private var readThread: Thread = null
    -
    -  /**
    -   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
    -   * Stored in a ListBuffer to facilitate removing committed batches.
    -   */
    -  @GuardedBy("this")
    -  protected val batches = new ListBuffer[(String, Timestamp)]
    -
    -  @GuardedBy("this")
    -  protected var currentOffset: LongOffset = new LongOffset(-1)
    -
    -  @GuardedBy("this")
    -  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
    +class TextSocketSource(
    +    protected val host: String,
    +    protected val port: Int,
    +    includeTimestamp: Boolean,
    +    sqlContext: SQLContext)
    +  extends Source with TextSocketReader with Logging {
     
       initialize()
     
    -  private def initialize(): Unit = synchronized {
    -    socket = new Socket(host, port)
    -    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
    -    readThread = new Thread(s"TextSocketSource($host, $port)") {
    -      setDaemon(true)
    -
    -      override def run(): Unit = {
    -        try {
    -          while (true) {
    -            val line = reader.readLine()
    -            if (line == null) {
    -              // End of file reached
    -              logWarning(s"Stream closed by $host:$port")
    -              return
    -            }
    -            TextSocketSource.this.synchronized {
    -              val newData = (line,
    -                Timestamp.valueOf(
    -                  TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
    -                )
    -              currentOffset = currentOffset + 1
    -              batches.append(newData)
    -            }
    -          }
    -        } catch {
    -          case e: IOException =>
    -        }
    -      }
    -    }
    -    readThread.start()
    -  }
    -
       /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    -
    -  override def getOffset: Option[Offset] = synchronized {
    -    if (currentOffset.offset == -1) {
    -      None
    -    } else {
    -      Some(currentOffset)
    -    }
    -  }
    +  override def schema: StructType =
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +
    +  override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_))
     
       /** Returns the data that is between the offsets (`start`, `end`]. */
    -  override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
    -    val startOrdinal =
    -      start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
    -    val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
    -
    -    // Internal buffer only holds the batches after lastOffsetCommitted
    -    val rawList = synchronized {
    -      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      batches.slice(sliceStart, sliceEnd)
    -    }
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    +    val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
    +      LongOffset.convert(end).map(_.offset))
     
         val rdd = sqlContext.sparkContext
           .parallelize(rawList)
           .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) }
         sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
       }
     
    -  override def commit(end: Offset): Unit = synchronized {
    +  override def commit(end: Offset): Unit = {
         val newOffset = LongOffset.convert(end).getOrElse(
           sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
             s"originate with an instance of this class")
         )
     
    -    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
    +    commitInternal(newOffset.offset)
    +  }
     
    -    if (offsetDiff < 0) {
    -      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
    -    }
    +  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +}
    +
    +case class TextSocketOffset(offset: Long) extends V2Offset {
    --- End diff --
    
    I would wait for my PR #20445 to go in where I migrate LongOffset to use OffsetV2


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163613655
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util._
    +import java.util.{List => JList}
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
    +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object TextSocketSourceProviderV2 {
    +  val HOST = "host"
    +  val PORT = "port"
    +  val INCLUDE_TIMESTAMP = "includeTimestamp"
    +  val NUM_PARTITIONS = "numPartitions"
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
    +    StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +class TextSocketSourceProviderV2 extends DataSourceV2
    +    with MicroBatchReadSupport with DataSourceRegister with Logging {
    +  override def shortName(): String = "socketv2"
    +
    +  override def createMicroBatchReader(
    +      schema: Optional[StructType],
    +      checkpointLocation: String,
    +      options: DataSourceV2Options): MicroBatchReader = {
    +    logWarning("The socket source should not be used for production applications! " +
    +      "It does not support recovery.")
    +    if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
    +      throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
    +    }
    +    if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
    +      throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
    +    }
    +    if (schema.isPresent) {
    +      throw new AnalysisException("The socket source does not support a user-specified schema.")
    +    }
    +
    +    if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
    +      Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match {
    +        case Success(bool) =>
    +        case Failure(_) =>
    +          throw new AnalysisException(
    +            "includeTimestamp must be set to either \"true\" or \"false\"")
    +      }
    +    }
    +
    +    new TextSocketStreamMicroBatchReader(options)
    +  }
    +}
    +
    +case class TextSocketStreamOffset(offset: Long) extends Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with Logging {
    +
    +  import TextSocketSourceProviderV2._
    +
    +  private var start: TextSocketStreamOffset = _
    +  private var end: TextSocketStreamOffset = _
    +
    +  private val host = options.get(HOST).get()
    +  private val port = options.get(PORT).get().toInt
    +  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false)
    +  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
    --- End diff --
    
    To match the old parallelize behavior, the default number of partitions should be sparkContext.defaultParallelism.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167127039
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala ---
    @@ -103,23 +111,40 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
         readThread.start()
       }
     
    -  /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    +  override def setOffsetRange(
    +      start: Optional[Offset],
    +      end: Optional[Offset]): Unit = synchronized {
    +    startOffset = start.orElse(LongOffset(-1L))
    +    endOffset = end.orElse(currentOffset)
    +  }
     
    -  override def getOffset: Option[Offset] = synchronized {
    -    if (currentOffset.offset == -1) {
    -      None
    +  override def getStartOffset(): Offset = {
    +    Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set"))
    +  }
    +
    +  override def getEndOffset(): Offset = {
    +    Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set"))
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    LongOffset(json.toLong)
    +  }
    +
    +  override def readSchema(): StructType = {
    +    val includeTimestamp = options.getBoolean("includeTimestamp", false)
    --- End diff --
    
    supernit: is there need for a variable here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1188/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167128861
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,246 @@
    +/*
    --- End diff --
    
    why does this show up as a new file? was this not a "git mv"? something went wrong, i would prefer that i can see a simple diff. Not much should change in the tests.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    @jerryshao any updates?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1052/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87199 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87199/testReport)** for PR 20382 at commit [`fdc9b9c`](https://github.com/apache/spark/commit/fdc9b9c8a1dcc749be97cfd1c46a502c33bf4bb9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    @zsxwing @tdas would you please help to review, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1055/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87371/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87202/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/837/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Right, that makes sense. LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    @jerryshao ping on this. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86671/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r168137288
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{IOException, OutputStreamWriter}
    +import java.net.ServerSocket
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.execution.streaming.LongOffset
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  test("V2 basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val provider = new TextSocketSourceProvider
    +    val options = new DataSourceOptions(
    +      Map("host" -> "localhost", "port" -> serverThread.port.toString).asJava)
    +    batchReader = provider.createMicroBatchReader(Optional.empty(), "", options)
    +
    +    val schema = batchReader.readSchema()
    +    assert(schema === StructType(StructField("value", StringType) :: Nil))
    +
    +    failAfter(streamingTimeout) {
    +      serverThread.enqueue("hello")
    +      batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +      while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == -1L) {
    +        batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +        Thread.sleep(10)
    +      }
    +      withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +        val offset1 = batchReader.getEndOffset
    +        val batch1 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch1.append(r.get())
    +          }
    +        }
    +        assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
    +
    +        serverThread.enqueue("world")
    +        while (batchReader.getEndOffset === offset1) {
    +          batchReader.setOffsetRange(Optional.of(offset1), Optional.empty())
    +          Thread.sleep(10)
    +        }
    +        val offset2 = batchReader.getEndOffset
    +        val batch2 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch2.append(r.get())
    +          }
    +        }
    +        assert(batch2.map(_.getAs[String](0)) === Seq("world"))
    +
    +        batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2))
    +        val both = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            both.append(r.get())
    +          }
    +        }
    +        assert(both.map(_.getAs[String](0)) === Seq("hello", "world"))
    +      }
    +
    +      // Try stopping the source to make sure this does not block forever.
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val provider = new TextSocketSourceProvider
    +    val options = new DataSourceOptions(Map("host" -> "localhost",
    +      "port" -> serverThread.port.toString, "includeTimestamp" -> "true").asJava)
    +    batchReader = provider.createMicroBatchReader(Optional.empty(), "", options)
    +
    +    val schema = batchReader.readSchema()
    +    assert(schema === StructType(StructField("value", StringType) ::
    +      StructField("timestamp", TimestampType) :: Nil))
    +
    +    failAfter(streamingTimeout) {
    +      serverThread.enqueue("hello")
    +      batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +      while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == -1L) {
    +        batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +        Thread.sleep(10)
    +      }
    +      withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +        val offset1 = batchReader.getEndOffset
    +        val batch1 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch1.append(r.get())
    +          }
    +        }
    +        assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
    +        val batch1Stamp = batch1.map(_.getAs[Timestamp](1)).head
    +
    +        serverThread.enqueue("world")
    +        while (batchReader.getEndOffset === offset1) {
    +          batchReader.setOffsetRange(Optional.of(offset1), Optional.empty())
    +          Thread.sleep(10)
    +        }
    +        val offset2 = batchReader.getEndOffset
    +        val batch2 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch2.append(r.get())
    +          }
    +        }
    +        assert(batch2.map(_.getAs[String](0)) === Seq("world"))
    +        val batch2Stamp = batch2.map(_.getAs[Timestamp](1)).head
    +        assert(!batch2Stamp.before(batch1Stamp))
    +      }
    +
    +      // Try stopping the source to make sure this does not block forever.
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  test("params not given") {
    +    val provider = new TextSocketSourceProvider
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map.empty[String, String].asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("host" -> "localhost").asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("port" -> "1234").asJava))
    +    }
    +  }
    +
    +  test("non-boolean includeTimestamp") {
    +    val provider = new TextSocketSourceProvider
    +    val params = Map("host" -> "localhost", "port" -> "1234", "includeTimestamp" -> "fasle")
    +    intercept[AnalysisException] {
    +      val a = new DataSourceOptions(params.asJava)
    +      provider.createMicroBatchReader(Optional.empty(), "", a)
    +    }
    +  }
    +
    +  test("user-specified schema given") {
    +    val provider = new TextSocketSourceProvider
    +    val userSpecifiedSchema = StructType(
    +      StructField("name", StringType) ::
    +      StructField("area", StringType) :: Nil)
    +    val params = Map("host" -> "localhost", "port" -> "1234")
    +    val exception = intercept[AnalysisException] {
    +      provider.createMicroBatchReader(
    +        Optional.of(userSpecifiedSchema), "", new DataSourceOptions(params.asJava))
    +    }
    +    assert(exception.getMessage.contains(
    +      "socket source does not support a user-specified schema"))
    +  }
    +
    +  test("no server up") {
    +    val provider = new TextSocketSourceProvider
    +    val parameters = Map("host" -> "localhost", "port" -> "0")
    +    intercept[IOException] {
    +      batchReader = provider.createMicroBatchReader(
    +        Optional.empty(), "", new DataSourceOptions(parameters.asJava))
    +    }
    +  }
    +
    --- End diff --
    
    what happened to the input row metrics test?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164930581
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -47,130 +48,141 @@ object TextSocketSource {
      * This source will *not* work in production applications due to multiple reasons, including no
      * support for fault recovery and keeping all of the text read in memory forever.
      */
    -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)
    -  extends Source with Logging {
    -
    -  @GuardedBy("this")
    -  private var socket: Socket = null
    -
    -  @GuardedBy("this")
    -  private var readThread: Thread = null
    -
    -  /**
    -   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
    -   * Stored in a ListBuffer to facilitate removing committed batches.
    -   */
    -  @GuardedBy("this")
    -  protected val batches = new ListBuffer[(String, Timestamp)]
    -
    -  @GuardedBy("this")
    -  protected var currentOffset: LongOffset = new LongOffset(-1)
    -
    -  @GuardedBy("this")
    -  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
    +class TextSocketSource(
    +    protected val host: String,
    +    protected val port: Int,
    +    includeTimestamp: Boolean,
    +    sqlContext: SQLContext)
    +  extends Source with TextSocketReader with Logging {
     
       initialize()
     
    -  private def initialize(): Unit = synchronized {
    -    socket = new Socket(host, port)
    -    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
    -    readThread = new Thread(s"TextSocketSource($host, $port)") {
    -      setDaemon(true)
    -
    -      override def run(): Unit = {
    -        try {
    -          while (true) {
    -            val line = reader.readLine()
    -            if (line == null) {
    -              // End of file reached
    -              logWarning(s"Stream closed by $host:$port")
    -              return
    -            }
    -            TextSocketSource.this.synchronized {
    -              val newData = (line,
    -                Timestamp.valueOf(
    -                  TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
    -                )
    -              currentOffset = currentOffset + 1
    -              batches.append(newData)
    -            }
    -          }
    -        } catch {
    -          case e: IOException =>
    -        }
    -      }
    -    }
    -    readThread.start()
    -  }
    -
       /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    -
    -  override def getOffset: Option[Offset] = synchronized {
    -    if (currentOffset.offset == -1) {
    -      None
    -    } else {
    -      Some(currentOffset)
    -    }
    -  }
    +  override def schema: StructType =
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +
    +  override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_))
     
       /** Returns the data that is between the offsets (`start`, `end`]. */
    -  override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
    -    val startOrdinal =
    -      start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
    -    val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
    -
    -    // Internal buffer only holds the batches after lastOffsetCommitted
    -    val rawList = synchronized {
    -      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      batches.slice(sliceStart, sliceEnd)
    -    }
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    +    val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
    +      LongOffset.convert(end).map(_.offset))
     
         val rdd = sqlContext.sparkContext
           .parallelize(rawList)
           .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) }
         sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
       }
     
    -  override def commit(end: Offset): Unit = synchronized {
    +  override def commit(end: Offset): Unit = {
         val newOffset = LongOffset.convert(end).getOrElse(
           sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
             s"originate with an instance of this class")
         )
     
    -    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
    +    commitInternal(newOffset.offset)
    +  }
     
    -    if (offsetDiff < 0) {
    -      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
    -    }
    +  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +}
    +
    +case class TextSocketOffset(offset: Long) extends V2Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with TextSocketReader with Logging {
    +
    +  private var startOffset: TextSocketOffset = _
    +  private var endOffset: TextSocketOffset = _
    +
    +  protected val host: String = options.get("host").get()
    +  protected val port: Int = options.get("port").get().toInt
     
    -    batches.trimStart(offsetDiff)
    -    lastOffsetCommitted = newOffset
    +  initialize()
    +
    +  override def setOffsetRange(start: Optional[V2Offset], end: Optional[V2Offset]): Unit = {
    +    startOffset = start.orElse(TextSocketOffset(-1L)).asInstanceOf[TextSocketOffset]
    +    endOffset = end.orElse(
    +      TextSocketOffset(getOffsetInternal.getOrElse(-1L))).asInstanceOf[TextSocketOffset]
       }
     
    -  /** Stop this source. */
    -  override def stop(): Unit = synchronized {
    -    if (socket != null) {
    -      try {
    -        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    -        // stop the readThread is to close the socket.
    -        socket.close()
    -      } catch {
    -        case e: IOException =>
    -      }
    -      socket = null
    +  override def getStartOffset(): V2Offset = {
    +    Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set"))
    +  }
    +
    +  override def getEndOffset(): V2Offset = {
    +    Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set"))
    +  }
    +
    +  override def deserializeOffset(json: String): V2Offset = {
    +    TextSocketOffset(json.toLong)
    +  }
    +
    +  override def readSchema(): StructType = {
    +    val includeTimestamp = options.getBoolean("includeTimestamp", false)
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +  }
    +
    +  override def createReadTasks(): JList[ReadTask[Row]] = {
    +    val rawList = getBatchInternal(Option(startOffset.offset), Option(endOffset.offset))
    +
    +    assert(SparkSession.getActiveSession.isDefined)
    +    val spark = SparkSession.getActiveSession.get
    +    val numPartitions = spark.sparkContext.defaultParallelism
    +
    +    val slices = Array.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +    rawList.zipWithIndex.foreach { case (r, idx) =>
    +      slices(idx % numPartitions).append(r)
         }
    +
    +    (0 until numPartitions).map { i =>
    +      val slice = slices(i)
    +      new ReadTask[Row] {
    +        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
    +          private var currentIdx = -1
    +
    +          override def next(): Boolean = {
    +            currentIdx += 1
    +            currentIdx < slice.size
    +          }
    +
    +          override def get(): Row = {
    +            Row(slice(currentIdx)._1, slice(currentIdx)._2)
    +          }
    +
    +          override def close(): Unit = {}
    +        }
    +      }
    +    }.toList.asJava
       }
     
    -  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +  override def commit(end: V2Offset): Unit = {
    +    val newOffset = end.asInstanceOf[TextSocketOffset]
    +    commitInternal(newOffset.offset)
    +  }
    +
    +  override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]"
     }
     
    -class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
    -  private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {
    -    Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {
    -      case Success(bool) => bool
    +class TextSocketSourceProvider extends DataSourceV2
    +  with MicroBatchReadSupport with StreamSourceProvider with DataSourceRegister with Logging {
    --- End diff --
    
    Why do we still need StreamSourceProvider?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87825 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87825/testReport)** for PR 20382 at commit [`6d38bed`](https://github.com/apache/spark/commit/6d38bed38f01a6a3919e07587a00279ec4388f23).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    It's unfortunate that the socket tests don't actually run streams end to end, but I think that's orthogonal to this PR.
    
    Can you run one of the programming guide examples using socket source (e.g. org.apache.spark.examples.sql.streamingStructuredSessionization) to make sure it works after this PR? If it does, LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87199 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87199/testReport)** for PR 20382 at commit [`fdc9b9c`](https://github.com/apache/spark/commit/fdc9b9c8a1dcc749be97cfd1c46a502c33bf4bb9).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchReader with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87203/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164930523
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala ---
    @@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging {
         println("-------------------------------------------")
         // scalastyle:off println
         spark
    -      .createDataFrame(spark.sparkContext.parallelize(rows), schema)
    +      .createDataFrame(rows.toList.asJava, schema)
    --- End diff --
    
    this fix should go into 2.3 branch. thanks for catching this. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87825 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87825/testReport)** for PR 20382 at commit [`6d38bed`](https://github.com/apache/spark/commit/6d38bed38f01a6a3919e07587a00279ec4388f23).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87372 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87372/testReport)** for PR 20382 at commit [`f3fc90c`](https://github.com/apache/spark/commit/f3fc90cc94210f313861625b5a8fe6ef754c05bd).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    LGTM. Merging to master. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87831/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163755636
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util._
    +import java.util.{List => JList}
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
    +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object TextSocketSourceProviderV2 {
    +  val HOST = "host"
    +  val PORT = "port"
    +  val INCLUDE_TIMESTAMP = "includeTimestamp"
    +  val NUM_PARTITIONS = "numPartitions"
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
    +    StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +class TextSocketSourceProviderV2 extends DataSourceV2
    +    with MicroBatchReadSupport with DataSourceRegister with Logging {
    --- End diff --
    
    The idea is that the existing TextSocketSourceProvider will have the MicroBatchReadSupport implementation here, in addition to the StreamSourceProvider implementation it already has.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87203 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87203/testReport)** for PR 20382 at commit [`874c91c`](https://github.com/apache/spark/commit/874c91c41942972cabb85be175f929fc62e74af7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87372/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163753088
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util._
    +import java.util.{List => JList}
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
    +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object TextSocketSourceProviderV2 {
    +  val HOST = "host"
    +  val PORT = "port"
    +  val INCLUDE_TIMESTAMP = "includeTimestamp"
    +  val NUM_PARTITIONS = "numPartitions"
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
    +    StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +class TextSocketSourceProviderV2 extends DataSourceV2
    +    with MicroBatchReadSupport with DataSourceRegister with Logging {
    --- End diff --
    
    @jose-torres , you mean that instead of creating a new V2 socket source, modifying current V1 socket source to make it work with V2, am I understanding correctly?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87659 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87659/testReport)** for PR 20382 at commit [`5011372`](https://github.com/apache/spark/commit/501137269c983e4d028eba817d1c5f45a305171d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #86581 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86581/testReport)** for PR 20382 at commit [`8f3b548`](https://github.com/apache/spark/commit/8f3b54824d92123a0e7d468d42db30dba72cded1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167128044
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala ---
    @@ -164,54 +213,43 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
         }
       }
     
    -  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +  override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]"
    --- End diff --
    
    This shows up in the StreamingQueryProgressEvent as description, so it may be better to have it as "TextSocket[..."


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87664 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87664/testReport)** for PR 20382 at commit [`fd890ad`](https://github.com/apache/spark/commit/fd890ad837bb7068c70a27921d67af1c3fe65350).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163616762
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util._
    +import java.util.{List => JList}
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
    +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object TextSocketSourceProviderV2 {
    +  val HOST = "host"
    +  val PORT = "port"
    +  val INCLUDE_TIMESTAMP = "includeTimestamp"
    +  val NUM_PARTITIONS = "numPartitions"
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
    +    StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +class TextSocketSourceProviderV2 extends DataSourceV2
    +    with MicroBatchReadSupport with DataSourceRegister with Logging {
    +  override def shortName(): String = "socketv2"
    +
    +  override def createMicroBatchReader(
    +      schema: Optional[StructType],
    +      checkpointLocation: String,
    +      options: DataSourceV2Options): MicroBatchReader = {
    +    logWarning("The socket source should not be used for production applications! " +
    +      "It does not support recovery.")
    +    if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
    +      throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
    +    }
    +    if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
    +      throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
    +    }
    +    if (schema.isPresent) {
    +      throw new AnalysisException("The socket source does not support a user-specified schema.")
    +    }
    +
    +    if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
    +      Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match {
    +        case Success(bool) =>
    +        case Failure(_) =>
    +          throw new AnalysisException(
    +            "includeTimestamp must be set to either \"true\" or \"false\"")
    +      }
    +    }
    +
    +    new TextSocketStreamMicroBatchReader(options)
    +  }
    +}
    +
    +case class TextSocketStreamOffset(offset: Long) extends Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with Logging {
    +
    +  import TextSocketSourceProviderV2._
    +
    +  private var start: TextSocketStreamOffset = _
    +  private var end: TextSocketStreamOffset = _
    +
    +  private val host = options.get(HOST).get()
    +  private val port = options.get(PORT).get().toInt
    +  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false)
    +  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val batches = new ListBuffer[(String, Timestamp)]
    +
    +  private val currentOffset = new AtomicLong(-1L)
    +
    +  private var initialized = false
    +
    +  @GuardedBy("this")
    +  private var lastOffsetCommitted: Long = -1L
    +
    +  override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {
    +    if (!initialized) {
    +      initialize()
    +      initialized = true
    +    }
    +
    +    this.start = start.orElse(TextSocketStreamOffset(-1L)).asInstanceOf[TextSocketStreamOffset]
    +    this.end =
    +      end.orElse(TextSocketStreamOffset(currentOffset.get())).asInstanceOf[TextSocketStreamOffset]
    +  }
    +
    +  override def getStartOffset(): Offset = {
    +    Option(start).getOrElse(throw new IllegalStateException("start offset not set"))
    +  }
    +
    +  override def getEndOffset(): Offset = {
    +    Option(end).getOrElse(throw new IllegalStateException("end offset not set"))
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketStreamOffset(json.toLong)
    +  }
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      SCHEMA_TIMESTAMP
    +    } else {
    +      SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def createReadTasks(): JList[ReadTask[Row]] = {
    +    val startOrdinal = start.offset.toInt + 1
    +    val endOrdinal = end.offset.toInt + 1
    +    val sliceStart = startOrdinal - lastOffsetCommitted.toInt - 1
    +    val sliceEnd = endOrdinal - lastOffsetCommitted.toInt - 1
    +
    +    val rawList = TextSocketStreamMicroBatchReader.this.synchronized {
    +      batches.slice(sliceStart, sliceEnd)
    +    }
    +
    +    val slices = Array.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +    rawList.zipWithIndex.foreach { case (r, idx) =>
    +      slices(idx % numPartitions).append(r)
    +    }
    +
    +    (0 until numPartitions).map { i =>
    +      val slice = slices(i)
    +      new ReadTask[Row] {
    +        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
    +          private var currentIdx = -1
    +
    +          override def next(): Boolean = {
    +            currentIdx += 1
    +            currentIdx < slice.size
    +          }
    +
    +          override def get(): Row = {
    +            Row(slice(currentIdx)._1, slice(currentIdx)._2)
    +          }
    +
    +          override def close(): Unit = {}
    +        }
    +      }
    +    }.toList.asJava
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val newOffset = end.asInstanceOf[TextSocketStreamOffset]
    +    val offsetDiff = (newOffset.offset - lastOffsetCommitted).toInt
    --- End diff --
    
    nit: conversion to int is unnecessary


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167126475
  
    --- Diff: sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ---
    @@ -5,6 +5,6 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
     org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
     org.apache.spark.sql.execution.datasources.text.TextFileFormat
     org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
    -org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
     org.apache.spark.sql.execution.streaming.RateSourceProvider
    +org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
    --- End diff --
    
    can you add a redirection in the `DataSource.backwardCompatibilityMap` for this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/701/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167128691
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala ---
    @@ -177,11 +177,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
               Optional.ofNullable(userSpecifiedSchema.orNull),
               Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
               options)
    +        val schema = tempReader.readSchema()
    +        // Stop tempReader to avoid side-affect thing
    +        tempReader.stop()
    --- End diff --
    
    i feel like this needs a try finally approach as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87819/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Aah okay. Thanks for letting me know.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #86640 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86640/testReport)** for PR 20382 at commit [`56c60f3`](https://github.com/apache/spark/commit/56c60f3d9d920cea095e78695544b371435ca6f5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/258/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r171225732
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.IOException
    +import java.net.InetSocketAddress
    +import java.nio.ByteBuffer
    +import java.nio.channels.ServerSocketChannel
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.execution.datasources.DataSource
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  case class AddSocketData(data: String*) extends AddData {
    +    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
    +      require(
    +        query.nonEmpty,
    +        "Cannot add data when there is no query for finding the active socket source")
    +
    +      val sources = query.get.logicalPlan.collect {
    +        case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source
    +      }
    +      if (sources.isEmpty) {
    +        throw new Exception(
    +          "Could not find socket source in the StreamExecution logical plan to add data to")
    +      } else if (sources.size > 1) {
    +        throw new Exception(
    +          "Could not select the socket source in the StreamExecution logical plan as there" +
    +            "are multiple socket sources:\n\t" + sources.mkString("\n\t"))
    +      }
    +      val socketSource = sources.head
    +
    +      assert(serverThread != null && serverThread.port != 0)
    +      val currOffset = socketSource.currentOffset
    +      data.foreach(serverThread.enqueue)
    +
    +      val newOffset = LongOffset(currOffset.offset + data.size)
    +      (socketSource, newOffset)
    +    }
    +
    +    override def toString: String = s"AddSocketData(data = $data)"
    +  }
    +
    +  test("backward compatibility with old path") {
    +    DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
    +      spark.sqlContext.conf).newInstance() match {
    +      case ds: MicroBatchReadSupport =>
    +        assert(ds.isInstanceOf[TextSocketSourceProvider])
    +      case _ =>
    +        throw new IllegalStateException("Could not find socket source")
    +    }
    +  }
    +
    +  test("basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val ref = spark
    +      import ref.implicits._
    +
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
    +        .load()
    +        .as[String]
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswer("hello"),
    +        AddSocketData("world"),
    +        CheckLastBatch("world"),
    +        CheckAnswer("hello", "world"),
    +        StopStream
    +      )
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map(
    +          "host" -> "localhost",
    +          "port" -> serverThread.port.toString,
    +          "includeTimestamp" -> "true"))
    +        .load()
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) ::
    +        StructField("timestamp", TimestampType) :: Nil))
    +
    +      var batch1Stamp: Timestamp = null
    +      var batch2Stamp: Timestamp = null
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "hello")
    +            batch1Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        AddSocketData("world"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "world")
    +            batch2Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        StopStream
    +      )
    +
    +      assert(!batch2Stamp.before(batch1Stamp))
    --- End diff --
    
    there is a slim chance that batch2stamp will be same as batch1stamp. 
    maybe worth adding a sleep(10) to ensure this.
    you should also check batch1stamp with timestamp taken directly before the query. otherwise it may pass tests if the query generated batch1stamp = -1 and batch2stamp = -2.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #86581 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86581/testReport)** for PR 20382 at commit [`8f3b548`](https://github.com/apache/spark/commit/8f3b54824d92123a0e7d468d42db30dba72cded1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Hi @tdas , would you please help to review again, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87371 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87371/testReport)** for PR 20382 at commit [`647c5cd`](https://github.com/apache/spark/commit/647c5cdd1e3cb4138b597bd429e01308f50468a6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164944324
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -47,130 +48,141 @@ object TextSocketSource {
      * This source will *not* work in production applications due to multiple reasons, including no
      * support for fault recovery and keeping all of the text read in memory forever.
      */
    -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)
    -  extends Source with Logging {
    -
    -  @GuardedBy("this")
    -  private var socket: Socket = null
    -
    -  @GuardedBy("this")
    -  private var readThread: Thread = null
    -
    -  /**
    -   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
    -   * Stored in a ListBuffer to facilitate removing committed batches.
    -   */
    -  @GuardedBy("this")
    -  protected val batches = new ListBuffer[(String, Timestamp)]
    -
    -  @GuardedBy("this")
    -  protected var currentOffset: LongOffset = new LongOffset(-1)
    -
    -  @GuardedBy("this")
    -  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
    +class TextSocketSource(
    --- End diff --
    
    Please update docs accordingly!! This is not a source, but a base interface used by two source implementations 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r171225853
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.IOException
    +import java.net.InetSocketAddress
    +import java.nio.ByteBuffer
    +import java.nio.channels.ServerSocketChannel
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.execution.datasources.DataSource
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  case class AddSocketData(data: String*) extends AddData {
    +    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
    +      require(
    +        query.nonEmpty,
    +        "Cannot add data when there is no query for finding the active socket source")
    +
    +      val sources = query.get.logicalPlan.collect {
    +        case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source
    +      }
    +      if (sources.isEmpty) {
    +        throw new Exception(
    +          "Could not find socket source in the StreamExecution logical plan to add data to")
    +      } else if (sources.size > 1) {
    +        throw new Exception(
    +          "Could not select the socket source in the StreamExecution logical plan as there" +
    +            "are multiple socket sources:\n\t" + sources.mkString("\n\t"))
    +      }
    +      val socketSource = sources.head
    +
    +      assert(serverThread != null && serverThread.port != 0)
    +      val currOffset = socketSource.currentOffset
    +      data.foreach(serverThread.enqueue)
    +
    +      val newOffset = LongOffset(currOffset.offset + data.size)
    +      (socketSource, newOffset)
    +    }
    +
    +    override def toString: String = s"AddSocketData(data = $data)"
    +  }
    +
    +  test("backward compatibility with old path") {
    +    DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
    +      spark.sqlContext.conf).newInstance() match {
    +      case ds: MicroBatchReadSupport =>
    +        assert(ds.isInstanceOf[TextSocketSourceProvider])
    +      case _ =>
    +        throw new IllegalStateException("Could not find socket source")
    +    }
    +  }
    +
    +  test("basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val ref = spark
    +      import ref.implicits._
    +
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
    +        .load()
    +        .as[String]
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswer("hello"),
    +        AddSocketData("world"),
    +        CheckLastBatch("world"),
    +        CheckAnswer("hello", "world"),
    +        StopStream
    +      )
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map(
    +          "host" -> "localhost",
    +          "port" -> serverThread.port.toString,
    +          "includeTimestamp" -> "true"))
    +        .load()
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) ::
    +        StructField("timestamp", TimestampType) :: Nil))
    +
    +      var batch1Stamp: Timestamp = null
    +      var batch2Stamp: Timestamp = null
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "hello")
    +            batch1Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        AddSocketData("world"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "world")
    +            batch2Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        StopStream
    +      )
    +
    +      assert(!batch2Stamp.before(batch1Stamp))
    +    }
    +  }
    +
    +  test("params not given") {
    +    val provider = new TextSocketSourceProvider
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map.empty[String, String].asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("host" -> "localhost").asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("port" -> "1234").asJava))
    +    }
    +  }
    +
    +  test("non-boolean includeTimestamp") {
    +    val provider = new TextSocketSourceProvider
    +    val params = Map("host" -> "localhost", "port" -> "1234", "includeTimestamp" -> "fasle")
    +    intercept[AnalysisException] {
    +      val a = new DataSourceOptions(params.asJava)
    +      provider.createMicroBatchReader(Optional.empty(), "", a)
    +    }
    +  }
    +
    +  test("user-specified schema given") {
    +    val provider = new TextSocketSourceProvider
    +    val userSpecifiedSchema = StructType(
    +      StructField("name", StringType) ::
    +      StructField("area", StringType) :: Nil)
    +    val params = Map("host" -> "localhost", "port" -> "1234")
    +    val exception = intercept[AnalysisException] {
    +      provider.createMicroBatchReader(
    +        Optional.of(userSpecifiedSchema), "", new DataSourceOptions(params.asJava))
    +    }
    +    assert(exception.getMessage.contains(
    +      "socket source does not support a user-specified schema"))
    +  }
    +
    +  test("no server up") {
    +    val provider = new TextSocketSourceProvider
    +    val parameters = Map("host" -> "localhost", "port" -> "0")
    +    intercept[IOException] {
    +      batchReader = provider.createMicroBatchReader(
    +        Optional.empty(), "", new DataSourceOptions(parameters.asJava))
    +    }
    --- End diff --
    
    assert on the message.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1179/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164943885
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -47,130 +48,141 @@ object TextSocketSource {
      * This source will *not* work in production applications due to multiple reasons, including no
      * support for fault recovery and keeping all of the text read in memory forever.
      */
    -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)
    -  extends Source with Logging {
    -
    -  @GuardedBy("this")
    -  private var socket: Socket = null
    -
    -  @GuardedBy("this")
    -  private var readThread: Thread = null
    -
    -  /**
    -   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
    -   * Stored in a ListBuffer to facilitate removing committed batches.
    -   */
    -  @GuardedBy("this")
    -  protected val batches = new ListBuffer[(String, Timestamp)]
    -
    -  @GuardedBy("this")
    -  protected var currentOffset: LongOffset = new LongOffset(-1)
    -
    -  @GuardedBy("this")
    -  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
    +class TextSocketSource(
    +    protected val host: String,
    +    protected val port: Int,
    +    includeTimestamp: Boolean,
    +    sqlContext: SQLContext)
    +  extends Source with TextSocketReader with Logging {
     
       initialize()
     
    -  private def initialize(): Unit = synchronized {
    -    socket = new Socket(host, port)
    -    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
    -    readThread = new Thread(s"TextSocketSource($host, $port)") {
    -      setDaemon(true)
    -
    -      override def run(): Unit = {
    -        try {
    -          while (true) {
    -            val line = reader.readLine()
    -            if (line == null) {
    -              // End of file reached
    -              logWarning(s"Stream closed by $host:$port")
    -              return
    -            }
    -            TextSocketSource.this.synchronized {
    -              val newData = (line,
    -                Timestamp.valueOf(
    -                  TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
    -                )
    -              currentOffset = currentOffset + 1
    -              batches.append(newData)
    -            }
    -          }
    -        } catch {
    -          case e: IOException =>
    -        }
    -      }
    -    }
    -    readThread.start()
    -  }
    -
       /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    -
    -  override def getOffset: Option[Offset] = synchronized {
    -    if (currentOffset.offset == -1) {
    -      None
    -    } else {
    -      Some(currentOffset)
    -    }
    -  }
    +  override def schema: StructType =
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +
    +  override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_))
     
       /** Returns the data that is between the offsets (`start`, `end`]. */
    -  override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
    -    val startOrdinal =
    -      start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
    -    val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
    -
    -    // Internal buffer only holds the batches after lastOffsetCommitted
    -    val rawList = synchronized {
    -      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      batches.slice(sliceStart, sliceEnd)
    -    }
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    +    val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
    +      LongOffset.convert(end).map(_.offset))
     
         val rdd = sqlContext.sparkContext
           .parallelize(rawList)
           .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) }
         sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
       }
     
    -  override def commit(end: Offset): Unit = synchronized {
    +  override def commit(end: Offset): Unit = {
         val newOffset = LongOffset.convert(end).getOrElse(
           sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
             s"originate with an instance of this class")
         )
     
    -    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
    +    commitInternal(newOffset.offset)
    +  }
     
    -    if (offsetDiff < 0) {
    -      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
    -    }
    +  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +}
    +
    +case class TextSocketOffset(offset: Long) extends V2Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with TextSocketReader with Logging {
    +
    +  private var startOffset: TextSocketOffset = _
    +  private var endOffset: TextSocketOffset = _
    +
    +  protected val host: String = options.get("host").get()
    +  protected val port: Int = options.get("port").get().toInt
     
    -    batches.trimStart(offsetDiff)
    -    lastOffsetCommitted = newOffset
    +  initialize()
    +
    +  override def setOffsetRange(start: Optional[V2Offset], end: Optional[V2Offset]): Unit = {
    +    startOffset = start.orElse(TextSocketOffset(-1L)).asInstanceOf[TextSocketOffset]
    +    endOffset = end.orElse(
    +      TextSocketOffset(getOffsetInternal.getOrElse(-1L))).asInstanceOf[TextSocketOffset]
       }
     
    -  /** Stop this source. */
    -  override def stop(): Unit = synchronized {
    -    if (socket != null) {
    -      try {
    -        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    -        // stop the readThread is to close the socket.
    -        socket.close()
    -      } catch {
    -        case e: IOException =>
    -      }
    -      socket = null
    +  override def getStartOffset(): V2Offset = {
    +    Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set"))
    +  }
    +
    +  override def getEndOffset(): V2Offset = {
    +    Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set"))
    +  }
    +
    +  override def deserializeOffset(json: String): V2Offset = {
    +    TextSocketOffset(json.toLong)
    +  }
    +
    +  override def readSchema(): StructType = {
    +    val includeTimestamp = options.getBoolean("includeTimestamp", false)
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +  }
    +
    +  override def createReadTasks(): JList[ReadTask[Row]] = {
    +    val rawList = getBatchInternal(Option(startOffset.offset), Option(endOffset.offset))
    +
    +    assert(SparkSession.getActiveSession.isDefined)
    +    val spark = SparkSession.getActiveSession.get
    +    val numPartitions = spark.sparkContext.defaultParallelism
    +
    +    val slices = Array.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +    rawList.zipWithIndex.foreach { case (r, idx) =>
    +      slices(idx % numPartitions).append(r)
         }
    +
    +    (0 until numPartitions).map { i =>
    +      val slice = slices(i)
    +      new ReadTask[Row] {
    +        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
    +          private var currentIdx = -1
    +
    +          override def next(): Boolean = {
    +            currentIdx += 1
    +            currentIdx < slice.size
    +          }
    +
    +          override def get(): Row = {
    +            Row(slice(currentIdx)._1, slice(currentIdx)._2)
    +          }
    +
    +          override def close(): Unit = {}
    +        }
    +      }
    +    }.toList.asJava
       }
     
    -  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +  override def commit(end: V2Offset): Unit = {
    +    val newOffset = end.asInstanceOf[TextSocketOffset]
    +    commitInternal(newOffset.offset)
    +  }
    +
    +  override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]"
     }
     
    -class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
    -  private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {
    -    Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {
    -      case Success(bool) => bool
    +class TextSocketSourceProvider extends DataSourceV2
    +  with MicroBatchReadSupport with StreamSourceProvider with DataSourceRegister with Logging {
    --- End diff --
    
    TD and I discussed this offline. It should be fine to remove the V1 StreamSourceProvider implementation, because:
    
    * this isn't a production-quality source, so users shouldn't need to fall back to it
    * this source won't be particularly useful at exercising the V1 execution pipeline once we transition all sources to V2


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r171469866
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.IOException
    +import java.net.InetSocketAddress
    +import java.nio.ByteBuffer
    +import java.nio.channels.ServerSocketChannel
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.execution.datasources.DataSource
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  case class AddSocketData(data: String*) extends AddData {
    +    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
    +      require(
    +        query.nonEmpty,
    +        "Cannot add data when there is no query for finding the active socket source")
    +
    +      val sources = query.get.logicalPlan.collect {
    +        case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source
    +      }
    +      if (sources.isEmpty) {
    +        throw new Exception(
    +          "Could not find socket source in the StreamExecution logical plan to add data to")
    +      } else if (sources.size > 1) {
    +        throw new Exception(
    +          "Could not select the socket source in the StreamExecution logical plan as there" +
    +            "are multiple socket sources:\n\t" + sources.mkString("\n\t"))
    +      }
    +      val socketSource = sources.head
    +
    +      assert(serverThread != null && serverThread.port != 0)
    +      val currOffset = socketSource.currentOffset
    +      data.foreach(serverThread.enqueue)
    +
    +      val newOffset = LongOffset(currOffset.offset + data.size)
    +      (socketSource, newOffset)
    +    }
    +
    +    override def toString: String = s"AddSocketData(data = $data)"
    +  }
    +
    +  test("backward compatibility with old path") {
    +    DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
    +      spark.sqlContext.conf).newInstance() match {
    +      case ds: MicroBatchReadSupport =>
    +        assert(ds.isInstanceOf[TextSocketSourceProvider])
    +      case _ =>
    +        throw new IllegalStateException("Could not find socket source")
    +    }
    +  }
    +
    +  test("basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val ref = spark
    +      import ref.implicits._
    +
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
    +        .load()
    +        .as[String]
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswer("hello"),
    +        AddSocketData("world"),
    +        CheckLastBatch("world"),
    +        CheckAnswer("hello", "world"),
    +        StopStream
    +      )
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map(
    +          "host" -> "localhost",
    +          "port" -> serverThread.port.toString,
    +          "includeTimestamp" -> "true"))
    +        .load()
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) ::
    +        StructField("timestamp", TimestampType) :: Nil))
    +
    +      var batch1Stamp: Timestamp = null
    +      var batch2Stamp: Timestamp = null
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "hello")
    +            batch1Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        AddSocketData("world"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "world")
    +            batch2Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        StopStream
    +      )
    +
    +      assert(!batch2Stamp.before(batch1Stamp))
    --- End diff --
    
    Hi @tdas , what's the meaning of "you should also check batch1stamp with timestamp taken directly before the query. ", I'm not clearly sure what specifically are you pointing to?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164934753
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala ---
    @@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging {
         println("-------------------------------------------")
         // scalastyle:off println
         spark
    -      .createDataFrame(spark.sparkContext.parallelize(rows), schema)
    +      .createDataFrame(rows.toList.asJava, schema)
    --- End diff --
    
    OK, I will create a separate PR for this small fix.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    #20445 will be merged in a few hours. please go ahead and update your PR with the refactoring that was suggested (mainly, no v1 version). 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87370/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167126569
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala ---
    @@ -15,40 +15,48 @@
      * limitations under the License.
      */
     
    -package org.apache.spark.sql.execution.streaming
    +package org.apache.spark.sql.execution.streaming.sources
     
     import java.io.{BufferedReader, InputStreamReader, IOException}
     import java.net.Socket
     import java.sql.Timestamp
     import java.text.SimpleDateFormat
    -import java.util.{Calendar, Locale}
    +import java.util.{Calendar, List => JList, Locale, Optional}
     import javax.annotation.concurrent.GuardedBy
     
    +import scala.collection.JavaConverters._
     import scala.collection.mutable.ListBuffer
     import scala.util.{Failure, Success, Try}
     
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql._
    -import org.apache.spark.sql.catalyst.InternalRow
    -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.execution.streaming.LongOffset
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
     import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    -import org.apache.spark.unsafe.types.UTF8String
     
    -
    -object TextSocketSource {
    +object TextSocketMicroBatchReader {
       val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
       val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
         StructField("timestamp", TimestampType) :: Nil)
       val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
     }
     
     /**
    - * A source that reads text lines through a TCP socket, designed only for tutorials and debugging.
    - * This source will *not* work in production applications due to multiple reasons, including no
    - * support for fault recovery and keeping all of the text read in memory forever.
    + * A MicroBatchReader that reads text lines through a TCP socket, designed only for tutorials and
    --- End diff --
    
    nit: tutorials -> testing (i know it was like that, but lets fix it since we are changing it anyway)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167128652
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala ---
    @@ -177,11 +177,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
               Optional.ofNullable(userSpecifiedSchema.orNull),
               Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
               options)
    +        val schema = tempReader.readSchema()
    +        // Stop tempReader to avoid side-affect thing
    --- End diff --
    
    nit: side-affect -> side-effect.
    
    good catch.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/187/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87660 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87660/testReport)** for PR 20382 at commit [`fd890ad`](https://github.com/apache/spark/commit/fd890ad837bb7068c70a27921d67af1c3fe65350).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1046/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87202 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87202/testReport)** for PR 20382 at commit [`874c91c`](https://github.com/apache/spark/commit/874c91c41942972cabb85be175f929fc62e74af7).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r171954250
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.IOException
    +import java.net.InetSocketAddress
    +import java.nio.ByteBuffer
    +import java.nio.channels.ServerSocketChannel
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.execution.datasources.DataSource
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  case class AddSocketData(data: String*) extends AddData {
    +    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
    +      require(
    +        query.nonEmpty,
    +        "Cannot add data when there is no query for finding the active socket source")
    +
    +      val sources = query.get.logicalPlan.collect {
    +        case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source
    +      }
    +      if (sources.isEmpty) {
    +        throw new Exception(
    +          "Could not find socket source in the StreamExecution logical plan to add data to")
    +      } else if (sources.size > 1) {
    +        throw new Exception(
    +          "Could not select the socket source in the StreamExecution logical plan as there" +
    +            "are multiple socket sources:\n\t" + sources.mkString("\n\t"))
    +      }
    +      val socketSource = sources.head
    +
    +      assert(serverThread != null && serverThread.port != 0)
    +      val currOffset = socketSource.currentOffset
    +      data.foreach(serverThread.enqueue)
    +
    +      val newOffset = LongOffset(currOffset.offset + data.size)
    +      (socketSource, newOffset)
    +    }
    +
    +    override def toString: String = s"AddSocketData(data = $data)"
    +  }
    +
    +  test("backward compatibility with old path") {
    +    DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
    +      spark.sqlContext.conf).newInstance() match {
    +      case ds: MicroBatchReadSupport =>
    +        assert(ds.isInstanceOf[TextSocketSourceProvider])
    +      case _ =>
    +        throw new IllegalStateException("Could not find socket source")
    +    }
    +  }
    +
    +  test("basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val ref = spark
    +      import ref.implicits._
    +
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
    +        .load()
    +        .as[String]
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswer("hello"),
    +        AddSocketData("world"),
    +        CheckLastBatch("world"),
    +        CheckAnswer("hello", "world"),
    +        StopStream
    +      )
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map(
    +          "host" -> "localhost",
    +          "port" -> serverThread.port.toString,
    +          "includeTimestamp" -> "true"))
    +        .load()
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) ::
    +        StructField("timestamp", TimestampType) :: Nil))
    +
    +      var batch1Stamp: Timestamp = null
    +      var batch2Stamp: Timestamp = null
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "hello")
    +            batch1Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        AddSocketData("world"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "world")
    +            batch2Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        StopStream
    +      )
    +
    +      assert(!batch2Stamp.before(batch1Stamp))
    +    }
    +  }
    +
    +  test("params not given") {
    +    val provider = new TextSocketSourceProvider
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map.empty[String, String].asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("host" -> "localhost").asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("port" -> "1234").asJava))
    +    }
    +  }
    +
    +  test("non-boolean includeTimestamp") {
    +    val provider = new TextSocketSourceProvider
    +    val params = Map("host" -> "localhost", "port" -> "1234", "includeTimestamp" -> "fasle")
    +    intercept[AnalysisException] {
    +      val a = new DataSourceOptions(params.asJava)
    +      provider.createMicroBatchReader(Optional.empty(), "", a)
    +    }
    +  }
    +
    +  test("user-specified schema given") {
    +    val provider = new TextSocketSourceProvider
    +    val userSpecifiedSchema = StructType(
    +      StructField("name", StringType) ::
    +      StructField("area", StringType) :: Nil)
    +    val params = Map("host" -> "localhost", "port" -> "1234")
    +    val exception = intercept[AnalysisException] {
    +      provider.createMicroBatchReader(
    +        Optional.of(userSpecifiedSchema), "", new DataSourceOptions(params.asJava))
    +    }
    +    assert(exception.getMessage.contains(
    +      "socket source does not support a user-specified schema"))
    +  }
    +
    +  test("no server up") {
    +    val provider = new TextSocketSourceProvider
    +    val parameters = Map("host" -> "localhost", "port" -> "0")
    +    intercept[IOException] {
    +      batchReader = provider.createMicroBatchReader(
    +        Optional.empty(), "", new DataSourceOptions(parameters.asJava))
    +    }
    --- End diff --
    
    thats fine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86640/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #86677 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86677/testReport)** for PR 20382 at commit [`9ceb3be`](https://github.com/apache/spark/commit/9ceb3be4a5a7a451ae740cf563792636f879a81b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Sure, I will do it today.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/838/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87199/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164944276
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -47,130 +48,141 @@ object TextSocketSource {
      * This source will *not* work in production applications due to multiple reasons, including no
      * support for fault recovery and keeping all of the text read in memory forever.
      */
    -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)
    -  extends Source with Logging {
    -
    -  @GuardedBy("this")
    -  private var socket: Socket = null
    -
    -  @GuardedBy("this")
    -  private var readThread: Thread = null
    -
    -  /**
    -   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
    -   * Stored in a ListBuffer to facilitate removing committed batches.
    -   */
    -  @GuardedBy("this")
    -  protected val batches = new ListBuffer[(String, Timestamp)]
    -
    -  @GuardedBy("this")
    -  protected var currentOffset: LongOffset = new LongOffset(-1)
    -
    -  @GuardedBy("this")
    -  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
    +class TextSocketSource(
    +    protected val host: String,
    +    protected val port: Int,
    +    includeTimestamp: Boolean,
    +    sqlContext: SQLContext)
    +  extends Source with TextSocketReader with Logging {
     
       initialize()
     
    -  private def initialize(): Unit = synchronized {
    -    socket = new Socket(host, port)
    -    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
    -    readThread = new Thread(s"TextSocketSource($host, $port)") {
    -      setDaemon(true)
    -
    -      override def run(): Unit = {
    -        try {
    -          while (true) {
    -            val line = reader.readLine()
    -            if (line == null) {
    -              // End of file reached
    -              logWarning(s"Stream closed by $host:$port")
    -              return
    -            }
    -            TextSocketSource.this.synchronized {
    -              val newData = (line,
    -                Timestamp.valueOf(
    -                  TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
    -                )
    -              currentOffset = currentOffset + 1
    -              batches.append(newData)
    -            }
    -          }
    -        } catch {
    -          case e: IOException =>
    -        }
    -      }
    -    }
    -    readThread.start()
    -  }
    -
       /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    -
    -  override def getOffset: Option[Offset] = synchronized {
    -    if (currentOffset.offset == -1) {
    -      None
    -    } else {
    -      Some(currentOffset)
    -    }
    -  }
    +  override def schema: StructType =
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +
    +  override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_))
     
       /** Returns the data that is between the offsets (`start`, `end`]. */
    -  override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
    -    val startOrdinal =
    -      start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
    -    val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
    -
    -    // Internal buffer only holds the batches after lastOffsetCommitted
    -    val rawList = synchronized {
    -      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      batches.slice(sliceStart, sliceEnd)
    -    }
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    +    val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
    +      LongOffset.convert(end).map(_.offset))
     
         val rdd = sqlContext.sparkContext
           .parallelize(rawList)
           .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) }
         sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
       }
     
    -  override def commit(end: Offset): Unit = synchronized {
    +  override def commit(end: Offset): Unit = {
         val newOffset = LongOffset.convert(end).getOrElse(
           sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
             s"originate with an instance of this class")
         )
     
    -    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
    +    commitInternal(newOffset.offset)
    +  }
     
    -    if (offsetDiff < 0) {
    -      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
    -    }
    +  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +}
    +
    +case class TextSocketOffset(offset: Long) extends V2Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with TextSocketReader with Logging {
    +
    +  private var startOffset: TextSocketOffset = _
    +  private var endOffset: TextSocketOffset = _
    +
    +  protected val host: String = options.get("host").get()
    +  protected val port: Int = options.get("port").get().toInt
     
    -    batches.trimStart(offsetDiff)
    -    lastOffsetCommitted = newOffset
    +  initialize()
    +
    +  override def setOffsetRange(start: Optional[V2Offset], end: Optional[V2Offset]): Unit = {
    +    startOffset = start.orElse(TextSocketOffset(-1L)).asInstanceOf[TextSocketOffset]
    +    endOffset = end.orElse(
    +      TextSocketOffset(getOffsetInternal.getOrElse(-1L))).asInstanceOf[TextSocketOffset]
       }
     
    -  /** Stop this source. */
    -  override def stop(): Unit = synchronized {
    -    if (socket != null) {
    -      try {
    -        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    -        // stop the readThread is to close the socket.
    -        socket.close()
    -      } catch {
    -        case e: IOException =>
    -      }
    -      socket = null
    +  override def getStartOffset(): V2Offset = {
    +    Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set"))
    +  }
    +
    +  override def getEndOffset(): V2Offset = {
    +    Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set"))
    +  }
    +
    +  override def deserializeOffset(json: String): V2Offset = {
    +    TextSocketOffset(json.toLong)
    +  }
    +
    +  override def readSchema(): StructType = {
    +    val includeTimestamp = options.getBoolean("includeTimestamp", false)
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +  }
    +
    +  override def createReadTasks(): JList[ReadTask[Row]] = {
    +    val rawList = getBatchInternal(Option(startOffset.offset), Option(endOffset.offset))
    +
    +    assert(SparkSession.getActiveSession.isDefined)
    +    val spark = SparkSession.getActiveSession.get
    +    val numPartitions = spark.sparkContext.defaultParallelism
    +
    +    val slices = Array.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +    rawList.zipWithIndex.foreach { case (r, idx) =>
    +      slices(idx % numPartitions).append(r)
         }
    +
    +    (0 until numPartitions).map { i =>
    +      val slice = slices(i)
    +      new ReadTask[Row] {
    +        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
    +          private var currentIdx = -1
    +
    +          override def next(): Boolean = {
    +            currentIdx += 1
    +            currentIdx < slice.size
    +          }
    +
    +          override def get(): Row = {
    +            Row(slice(currentIdx)._1, slice(currentIdx)._2)
    +          }
    +
    +          override def close(): Unit = {}
    +        }
    +      }
    +    }.toList.asJava
       }
     
    -  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +  override def commit(end: V2Offset): Unit = {
    +    val newOffset = end.asInstanceOf[TextSocketOffset]
    +    commitInternal(newOffset.offset)
    +  }
    +
    +  override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]"
     }
     
    -class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
    -  private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {
    -    Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {
    -      case Success(bool) => bool
    +class TextSocketSourceProvider extends DataSourceV2
    +  with MicroBatchReadSupport with StreamSourceProvider with DataSourceRegister with Logging {
    --- End diff --
    
    OK, I will update the patch accordingly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87659 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87659/testReport)** for PR 20382 at commit [`5011372`](https://github.com/apache/spark/commit/501137269c983e4d028eba817d1c5f45a305171d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r171510614
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.IOException
    +import java.net.InetSocketAddress
    +import java.nio.ByteBuffer
    +import java.nio.channels.ServerSocketChannel
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.execution.datasources.DataSource
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  case class AddSocketData(data: String*) extends AddData {
    +    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
    +      require(
    +        query.nonEmpty,
    +        "Cannot add data when there is no query for finding the active socket source")
    +
    +      val sources = query.get.logicalPlan.collect {
    +        case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source
    +      }
    +      if (sources.isEmpty) {
    +        throw new Exception(
    +          "Could not find socket source in the StreamExecution logical plan to add data to")
    +      } else if (sources.size > 1) {
    +        throw new Exception(
    +          "Could not select the socket source in the StreamExecution logical plan as there" +
    +            "are multiple socket sources:\n\t" + sources.mkString("\n\t"))
    +      }
    +      val socketSource = sources.head
    +
    +      assert(serverThread != null && serverThread.port != 0)
    +      val currOffset = socketSource.currentOffset
    +      data.foreach(serverThread.enqueue)
    +
    +      val newOffset = LongOffset(currOffset.offset + data.size)
    +      (socketSource, newOffset)
    +    }
    +
    +    override def toString: String = s"AddSocketData(data = $data)"
    +  }
    +
    +  test("backward compatibility with old path") {
    +    DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
    +      spark.sqlContext.conf).newInstance() match {
    +      case ds: MicroBatchReadSupport =>
    +        assert(ds.isInstanceOf[TextSocketSourceProvider])
    +      case _ =>
    +        throw new IllegalStateException("Could not find socket source")
    +    }
    +  }
    +
    +  test("basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val ref = spark
    +      import ref.implicits._
    +
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
    +        .load()
    +        .as[String]
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswer("hello"),
    +        AddSocketData("world"),
    +        CheckLastBatch("world"),
    +        CheckAnswer("hello", "world"),
    +        StopStream
    +      )
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map(
    +          "host" -> "localhost",
    +          "port" -> serverThread.port.toString,
    +          "includeTimestamp" -> "true"))
    +        .load()
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) ::
    +        StructField("timestamp", TimestampType) :: Nil))
    +
    +      var batch1Stamp: Timestamp = null
    +      var batch2Stamp: Timestamp = null
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "hello")
    +            batch1Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        AddSocketData("world"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "world")
    +            batch2Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        StopStream
    +      )
    +
    +      assert(!batch2Stamp.before(batch1Stamp))
    --- End diff --
    
    val timestamp = System.currentTimeMillis
    testStream(...)(
       // get batch1stamp
    )
    // assert batch1stamp >= timestamp



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20382


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    relevant test failed. please make sure that there is no flakiness in the tests.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163730153
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util._
    +import java.util.{List => JList}
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
    +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object TextSocketSourceProviderV2 {
    +  val HOST = "host"
    +  val PORT = "port"
    +  val INCLUDE_TIMESTAMP = "includeTimestamp"
    +  val NUM_PARTITIONS = "numPartitions"
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
    +    StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +class TextSocketSourceProviderV2 extends DataSourceV2
    +    with MicroBatchReadSupport with DataSourceRegister with Logging {
    +  override def shortName(): String = "socketv2"
    +
    +  override def createMicroBatchReader(
    +      schema: Optional[StructType],
    +      checkpointLocation: String,
    +      options: DataSourceV2Options): MicroBatchReader = {
    +    logWarning("The socket source should not be used for production applications! " +
    +      "It does not support recovery.")
    +    if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
    +      throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
    +    }
    +    if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
    +      throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
    +    }
    +    if (schema.isPresent) {
    +      throw new AnalysisException("The socket source does not support a user-specified schema.")
    +    }
    +
    +    if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
    +      Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match {
    +        case Success(bool) =>
    +        case Failure(_) =>
    +          throw new AnalysisException(
    +            "includeTimestamp must be set to either \"true\" or \"false\"")
    +      }
    +    }
    +
    +    new TextSocketStreamMicroBatchReader(options)
    +  }
    +}
    +
    +case class TextSocketStreamOffset(offset: Long) extends Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with Logging {
    +
    +  import TextSocketSourceProviderV2._
    +
    +  private var start: TextSocketStreamOffset = _
    +  private var end: TextSocketStreamOffset = _
    +
    +  private val host = options.get(HOST).get()
    +  private val port = options.get(PORT).get().toInt
    +  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false)
    +  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val batches = new ListBuffer[(String, Timestamp)]
    +
    +  private val currentOffset = new AtomicLong(-1L)
    +
    +  private var initialized = false
    +
    +  @GuardedBy("this")
    +  private var lastOffsetCommitted: Long = -1L
    +
    +  override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {
    +    if (!initialized) {
    --- End diff --
    
    I don't think this will solve that problem, since each reader will just have its own initialize bit.
    
    In general, I think it's fine if we do a bit of extra work. V1 sources do have to support being created multiple times (in e.g. restart scenarios), and the lifecycles of the two V2 readers being created here don't overlap. (We should be closing the tempReader created in DataStreamReader, though.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163725096
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util._
    +import java.util.{List => JList}
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
    +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object TextSocketSourceProviderV2 {
    +  val HOST = "host"
    +  val PORT = "port"
    +  val INCLUDE_TIMESTAMP = "includeTimestamp"
    +  val NUM_PARTITIONS = "numPartitions"
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
    +    StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +class TextSocketSourceProviderV2 extends DataSourceV2
    +    with MicroBatchReadSupport with DataSourceRegister with Logging {
    +  override def shortName(): String = "socketv2"
    +
    +  override def createMicroBatchReader(
    +      schema: Optional[StructType],
    +      checkpointLocation: String,
    +      options: DataSourceV2Options): MicroBatchReader = {
    +    logWarning("The socket source should not be used for production applications! " +
    +      "It does not support recovery.")
    +    if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
    +      throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
    +    }
    +    if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
    +      throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
    +    }
    +    if (schema.isPresent) {
    +      throw new AnalysisException("The socket source does not support a user-specified schema.")
    +    }
    +
    +    if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
    +      Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match {
    +        case Success(bool) =>
    +        case Failure(_) =>
    +          throw new AnalysisException(
    +            "includeTimestamp must be set to either \"true\" or \"false\"")
    +      }
    +    }
    +
    +    new TextSocketStreamMicroBatchReader(options)
    +  }
    +}
    +
    +case class TextSocketStreamOffset(offset: Long) extends Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with Logging {
    +
    +  import TextSocketSourceProviderV2._
    +
    +  private var start: TextSocketStreamOffset = _
    +  private var end: TextSocketStreamOffset = _
    +
    +  private val host = options.get(HOST).get()
    +  private val port = options.get(PORT).get().toInt
    +  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false)
    +  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val batches = new ListBuffer[(String, Timestamp)]
    +
    +  private val currentOffset = new AtomicLong(-1L)
    +
    +  private var initialized = false
    +
    +  @GuardedBy("this")
    +  private var lastOffsetCommitted: Long = -1L
    +
    +  override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {
    +    if (!initialized) {
    --- End diff --
    
    This is what I want to bring out. Originally I initialized this in constructor like old socket source. But I found that `MicroBatchReader` will be created in two different places with two objects. So initializing in constructor will create two sock threads and connectors. This is different from V1 source. In V1 source, we only created source once, but with V2 `MicroBatchReader` we will create two objects in two different places (one for schema), which means such side-affect actions in constructor will have two copies. Ideally we should only create this `MicroBatchReader` once.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r170178735
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{IOException, OutputStreamWriter}
    +import java.net.ServerSocket
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.execution.streaming.LongOffset
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  test("V2 basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val provider = new TextSocketSourceProvider
    +    val options = new DataSourceOptions(
    +      Map("host" -> "localhost", "port" -> serverThread.port.toString).asJava)
    +    batchReader = provider.createMicroBatchReader(Optional.empty(), "", options)
    +
    +    val schema = batchReader.readSchema()
    +    assert(schema === StructType(StructField("value", StringType) :: Nil))
    +
    +    failAfter(streamingTimeout) {
    +      serverThread.enqueue("hello")
    +      batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +      while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == -1L) {
    +        batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +        Thread.sleep(10)
    +      }
    +      withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +        val offset1 = batchReader.getEndOffset
    +        val batch1 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch1.append(r.get())
    +          }
    +        }
    +        assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
    +
    +        serverThread.enqueue("world")
    +        while (batchReader.getEndOffset === offset1) {
    +          batchReader.setOffsetRange(Optional.of(offset1), Optional.empty())
    +          Thread.sleep(10)
    +        }
    +        val offset2 = batchReader.getEndOffset
    +        val batch2 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch2.append(r.get())
    +          }
    +        }
    +        assert(batch2.map(_.getAs[String](0)) === Seq("world"))
    +
    +        batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2))
    +        val both = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            both.append(r.get())
    +          }
    +        }
    +        assert(both.map(_.getAs[String](0)) === Seq("hello", "world"))
    +      }
    +
    +      // Try stopping the source to make sure this does not block forever.
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val provider = new TextSocketSourceProvider
    +    val options = new DataSourceOptions(Map("host" -> "localhost",
    +      "port" -> serverThread.port.toString, "includeTimestamp" -> "true").asJava)
    +    batchReader = provider.createMicroBatchReader(Optional.empty(), "", options)
    +
    +    val schema = batchReader.readSchema()
    +    assert(schema === StructType(StructField("value", StringType) ::
    +      StructField("timestamp", TimestampType) :: Nil))
    +
    +    failAfter(streamingTimeout) {
    +      serverThread.enqueue("hello")
    +      batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +      while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == -1L) {
    +        batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +        Thread.sleep(10)
    +      }
    +      withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +        val offset1 = batchReader.getEndOffset
    +        val batch1 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch1.append(r.get())
    +          }
    +        }
    +        assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
    +        val batch1Stamp = batch1.map(_.getAs[Timestamp](1)).head
    +
    +        serverThread.enqueue("world")
    +        while (batchReader.getEndOffset === offset1) {
    +          batchReader.setOffsetRange(Optional.of(offset1), Optional.empty())
    +          Thread.sleep(10)
    +        }
    +        val offset2 = batchReader.getEndOffset
    +        val batch2 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch2.append(r.get())
    +          }
    +        }
    +        assert(batch2.map(_.getAs[String](0)) === Seq("world"))
    +        val batch2Stamp = batch2.map(_.getAs[Timestamp](1)).head
    +        assert(!batch2Stamp.before(batch1Stamp))
    +      }
    +
    +      // Try stopping the source to make sure this does not block forever.
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  test("params not given") {
    +    val provider = new TextSocketSourceProvider
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map.empty[String, String].asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("host" -> "localhost").asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("port" -> "1234").asJava))
    +    }
    +  }
    +
    +  test("non-boolean includeTimestamp") {
    +    val provider = new TextSocketSourceProvider
    +    val params = Map("host" -> "localhost", "port" -> "1234", "includeTimestamp" -> "fasle")
    +    intercept[AnalysisException] {
    +      val a = new DataSourceOptions(params.asJava)
    +      provider.createMicroBatchReader(Optional.empty(), "", a)
    +    }
    +  }
    +
    +  test("user-specified schema given") {
    +    val provider = new TextSocketSourceProvider
    +    val userSpecifiedSchema = StructType(
    +      StructField("name", StringType) ::
    +      StructField("area", StringType) :: Nil)
    +    val params = Map("host" -> "localhost", "port" -> "1234")
    +    val exception = intercept[AnalysisException] {
    +      provider.createMicroBatchReader(
    +        Optional.of(userSpecifiedSchema), "", new DataSourceOptions(params.asJava))
    +    }
    +    assert(exception.getMessage.contains(
    +      "socket source does not support a user-specified schema"))
    +  }
    +
    +  test("no server up") {
    +    val provider = new TextSocketSourceProvider
    +    val parameters = Map("host" -> "localhost", "port" -> "0")
    +    intercept[IOException] {
    +      batchReader = provider.createMicroBatchReader(
    +        Optional.empty(), "", new DataSourceOptions(parameters.asJava))
    +    }
    +  }
    +
    --- End diff --
    
    Because of the changes of data source APIs, so for now it is hard to get DF's metrics from V2 API, that's why I deleted this test.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r170759995
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{IOException, OutputStreamWriter}
    +import java.net.ServerSocket
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.execution.streaming.LongOffset
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  test("V2 basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val provider = new TextSocketSourceProvider
    +    val options = new DataSourceOptions(
    +      Map("host" -> "localhost", "port" -> serverThread.port.toString).asJava)
    +    batchReader = provider.createMicroBatchReader(Optional.empty(), "", options)
    +
    +    val schema = batchReader.readSchema()
    +    assert(schema === StructType(StructField("value", StringType) :: Nil))
    +
    +    failAfter(streamingTimeout) {
    +      serverThread.enqueue("hello")
    +      batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +      while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == -1L) {
    +        batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +        Thread.sleep(10)
    +      }
    +      withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +        val offset1 = batchReader.getEndOffset
    +        val batch1 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch1.append(r.get())
    +          }
    +        }
    +        assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
    +
    +        serverThread.enqueue("world")
    +        while (batchReader.getEndOffset === offset1) {
    +          batchReader.setOffsetRange(Optional.of(offset1), Optional.empty())
    +          Thread.sleep(10)
    +        }
    +        val offset2 = batchReader.getEndOffset
    +        val batch2 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch2.append(r.get())
    +          }
    +        }
    +        assert(batch2.map(_.getAs[String](0)) === Seq("world"))
    +
    +        batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2))
    +        val both = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            both.append(r.get())
    +          }
    +        }
    +        assert(both.map(_.getAs[String](0)) === Seq("hello", "world"))
    +      }
    +
    +      // Try stopping the source to make sure this does not block forever.
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val provider = new TextSocketSourceProvider
    +    val options = new DataSourceOptions(Map("host" -> "localhost",
    +      "port" -> serverThread.port.toString, "includeTimestamp" -> "true").asJava)
    +    batchReader = provider.createMicroBatchReader(Optional.empty(), "", options)
    +
    +    val schema = batchReader.readSchema()
    +    assert(schema === StructType(StructField("value", StringType) ::
    +      StructField("timestamp", TimestampType) :: Nil))
    +
    +    failAfter(streamingTimeout) {
    +      serverThread.enqueue("hello")
    +      batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +      while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == -1L) {
    +        batchReader.setOffsetRange(Optional.empty(), Optional.empty())
    +        Thread.sleep(10)
    +      }
    +      withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +        val offset1 = batchReader.getEndOffset
    +        val batch1 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch1.append(r.get())
    +          }
    +        }
    +        assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
    +        val batch1Stamp = batch1.map(_.getAs[Timestamp](1)).head
    +
    +        serverThread.enqueue("world")
    +        while (batchReader.getEndOffset === offset1) {
    +          batchReader.setOffsetRange(Optional.of(offset1), Optional.empty())
    +          Thread.sleep(10)
    +        }
    +        val offset2 = batchReader.getEndOffset
    +        val batch2 = new ListBuffer[Row]
    +        batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r =>
    +          while (r.next()) {
    +            batch2.append(r.get())
    +          }
    +        }
    +        assert(batch2.map(_.getAs[String](0)) === Seq("world"))
    +        val batch2Stamp = batch2.map(_.getAs[Timestamp](1)).head
    +        assert(!batch2Stamp.before(batch1Stamp))
    +      }
    +
    +      // Try stopping the source to make sure this does not block forever.
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  test("params not given") {
    +    val provider = new TextSocketSourceProvider
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map.empty[String, String].asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("host" -> "localhost").asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("port" -> "1234").asJava))
    +    }
    +  }
    +
    +  test("non-boolean includeTimestamp") {
    +    val provider = new TextSocketSourceProvider
    +    val params = Map("host" -> "localhost", "port" -> "1234", "includeTimestamp" -> "fasle")
    +    intercept[AnalysisException] {
    +      val a = new DataSourceOptions(params.asJava)
    +      provider.createMicroBatchReader(Optional.empty(), "", a)
    +    }
    +  }
    +
    +  test("user-specified schema given") {
    +    val provider = new TextSocketSourceProvider
    +    val userSpecifiedSchema = StructType(
    +      StructField("name", StringType) ::
    +      StructField("area", StringType) :: Nil)
    +    val params = Map("host" -> "localhost", "port" -> "1234")
    +    val exception = intercept[AnalysisException] {
    +      provider.createMicroBatchReader(
    +        Optional.of(userSpecifiedSchema), "", new DataSourceOptions(params.asJava))
    +    }
    +    assert(exception.getMessage.contains(
    +      "socket source does not support a user-specified schema"))
    +  }
    +
    +  test("no server up") {
    +    val provider = new TextSocketSourceProvider
    +    val parameters = Map("host" -> "localhost", "port" -> "0")
    +    intercept[IOException] {
    +      batchReader = provider.createMicroBatchReader(
    +        Optional.empty(), "", new DataSourceOptions(parameters.asJava))
    +    }
    +  }
    +
    --- End diff --
    
    the test below seems good replacement to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86581/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87664/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87664 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87664/testReport)** for PR 20382 at commit [`fd890ad`](https://github.com/apache/spark/commit/fd890ad837bb7068c70a27921d67af1c3fe65350).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87831 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87831/testReport)** for PR 20382 at commit [`762f1da`](https://github.com/apache/spark/commit/762f1da952eae99fc7b377a08267c0d4cdaf00ee).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    @jose-torres can you please help to review, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163612181
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util._
    +import java.util.{List => JList}
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
    +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object TextSocketSourceProviderV2 {
    +  val HOST = "host"
    +  val PORT = "port"
    +  val INCLUDE_TIMESTAMP = "includeTimestamp"
    +  val NUM_PARTITIONS = "numPartitions"
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
    +    StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +class TextSocketSourceProviderV2 extends DataSourceV2
    +    with MicroBatchReadSupport with DataSourceRegister with Logging {
    --- End diff --
    
    The intent is for the V2 and V1 source to live in the same register, so existing queries can start using the V2 source with no change needed. This also allows the V2 implementation to be validated by passing all the old tests.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/703/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167127081
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala ---
    @@ -103,23 +111,40 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
         readThread.start()
       }
     
    -  /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    +  override def setOffsetRange(
    +      start: Optional[Offset],
    --- End diff --
    
    nit: wont this fit on a single line?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87383 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87383/testReport)** for PR 20382 at commit [`f3fc90c`](https://github.com/apache/spark/commit/f3fc90cc94210f313861625b5a8fe6ef754c05bd).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1181/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87660/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Hi @tdas, I'm on vacation this week, will update the code when I have time. Sorry for the delay.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164944362
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TextSocketReader.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.util.Calendar
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.mutable.ListBuffer
    +
    +import org.apache.spark.internal.Logging
    +
    +trait TextSocketReader extends Logging {
    --- End diff --
    
    Please add docs!! This is a base interface used by two source implementations 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87383/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r171226477
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala ---
    @@ -61,13 +68,13 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
        * Stored in a ListBuffer to facilitate removing committed batches.
        */
       @GuardedBy("this")
    -  protected val batches = new ListBuffer[(String, Timestamp)]
    +  private val batches = new ListBuffer[(String, Timestamp)]
     
       @GuardedBy("this")
    -  protected var currentOffset: LongOffset = new LongOffset(-1)
    +  private[sources] var currentOffset: LongOffset = LongOffset(-1L)
    --- End diff --
    
    this does not make sene. you are directly accessing something that should be accessed while synchronized on this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87370 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87370/testReport)** for PR 20382 at commit [`068c050`](https://github.com/apache/spark/commit/068c050547a3ae002ac77d0ea2d48e2b82caa049).
     * This patch **fails due to an unknown error code, -9**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87667 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87667/testReport)** for PR 20382 at commit [`fd890ad`](https://github.com/apache/spark/commit/fd890ad837bb7068c70a27921d67af1c3fe65350).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163614088
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util._
    +import java.util.{List => JList}
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
    +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object TextSocketSourceProviderV2 {
    +  val HOST = "host"
    +  val PORT = "port"
    +  val INCLUDE_TIMESTAMP = "includeTimestamp"
    +  val NUM_PARTITIONS = "numPartitions"
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
    +    StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +class TextSocketSourceProviderV2 extends DataSourceV2
    +    with MicroBatchReadSupport with DataSourceRegister with Logging {
    +  override def shortName(): String = "socketv2"
    +
    +  override def createMicroBatchReader(
    +      schema: Optional[StructType],
    +      checkpointLocation: String,
    +      options: DataSourceV2Options): MicroBatchReader = {
    +    logWarning("The socket source should not be used for production applications! " +
    +      "It does not support recovery.")
    +    if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
    +      throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
    +    }
    +    if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
    +      throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
    +    }
    +    if (schema.isPresent) {
    +      throw new AnalysisException("The socket source does not support a user-specified schema.")
    +    }
    +
    +    if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
    +      Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match {
    +        case Success(bool) =>
    +        case Failure(_) =>
    +          throw new AnalysisException(
    +            "includeTimestamp must be set to either \"true\" or \"false\"")
    +      }
    +    }
    +
    +    new TextSocketStreamMicroBatchReader(options)
    +  }
    +}
    +
    +case class TextSocketStreamOffset(offset: Long) extends Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with Logging {
    +
    +  import TextSocketSourceProviderV2._
    +
    +  private var start: TextSocketStreamOffset = _
    +  private var end: TextSocketStreamOffset = _
    +
    +  private val host = options.get(HOST).get()
    +  private val port = options.get(PORT).get().toInt
    +  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false)
    +  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val batches = new ListBuffer[(String, Timestamp)]
    +
    +  private val currentOffset = new AtomicLong(-1L)
    +
    +  private var initialized = false
    +
    +  @GuardedBy("this")
    +  private var lastOffsetCommitted: Long = -1L
    +
    +  override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {
    +    if (!initialized) {
    --- End diff --
    
    Is it possible to initialize in the constructor?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167126686
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala ---
    @@ -15,40 +15,48 @@
      * limitations under the License.
      */
     
    -package org.apache.spark.sql.execution.streaming
    +package org.apache.spark.sql.execution.streaming.sources
     
     import java.io.{BufferedReader, InputStreamReader, IOException}
     import java.net.Socket
     import java.sql.Timestamp
     import java.text.SimpleDateFormat
    -import java.util.{Calendar, Locale}
    +import java.util.{Calendar, List => JList, Locale, Optional}
     import javax.annotation.concurrent.GuardedBy
     
    +import scala.collection.JavaConverters._
     import scala.collection.mutable.ListBuffer
     import scala.util.{Failure, Success, Try}
     
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql._
    -import org.apache.spark.sql.catalyst.InternalRow
    -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.execution.streaming.LongOffset
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
     import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    -import org.apache.spark.unsafe.types.UTF8String
     
    -
    -object TextSocketSource {
    +object TextSocketMicroBatchReader {
       val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
       val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
         StructField("timestamp", TimestampType) :: Nil)
       val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
     }
     
     /**
    - * A source that reads text lines through a TCP socket, designed only for tutorials and debugging.
    - * This source will *not* work in production applications due to multiple reasons, including no
    - * support for fault recovery and keeping all of the text read in memory forever.
    + * A MicroBatchReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This MicroBatchReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery and keeping all of the text read in memory
    + * forever.
    --- End diff --
    
    this does not keep it forever. so remove this reason, just keep "no support for fault recover".


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167713045
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala ---
    @@ -15,40 +15,48 @@
      * limitations under the License.
      */
     
    -package org.apache.spark.sql.execution.streaming
    +package org.apache.spark.sql.execution.streaming.sources
     
     import java.io.{BufferedReader, InputStreamReader, IOException}
     import java.net.Socket
     import java.sql.Timestamp
     import java.text.SimpleDateFormat
    -import java.util.{Calendar, Locale}
    +import java.util.{Calendar, List => JList, Locale, Optional}
     import javax.annotation.concurrent.GuardedBy
     
    +import scala.collection.JavaConverters._
     import scala.collection.mutable.ListBuffer
     import scala.util.{Failure, Success, Try}
     
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql._
    -import org.apache.spark.sql.catalyst.InternalRow
    -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
    +import org.apache.spark.sql.execution.streaming.LongOffset
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
     import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    -import org.apache.spark.unsafe.types.UTF8String
     
    -
    -object TextSocketSource {
    +object TextSocketMicroBatchReader {
       val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
       val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
         StructField("timestamp", TimestampType) :: Nil)
       val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
     }
     
     /**
    - * A source that reads text lines through a TCP socket, designed only for tutorials and debugging.
    - * This source will *not* work in production applications due to multiple reasons, including no
    - * support for fault recovery and keeping all of the text read in memory forever.
    + * A MicroBatchReader that reads text lines through a TCP socket, designed only for tutorials and
    --- End diff --
    
    Tutorials is correct here; see e.g. StructuredSessionization.scala


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167776323
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,246 @@
    +/*
    --- End diff --
    
    Sorry @tdas , I did it by simply "mv", not "git mv". This doesn't change a lot, just to be suited for data source v2 API.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87819 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87819/testReport)** for PR 20382 at commit [`1073be4`](https://github.com/apache/spark/commit/1073be420b2cc5fd099929fc0215bf8c1be4b6e0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r171511528
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.IOException
    +import java.net.InetSocketAddress
    +import java.nio.ByteBuffer
    +import java.nio.channels.ServerSocketChannel
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.execution.datasources.DataSource
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  case class AddSocketData(data: String*) extends AddData {
    +    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
    +      require(
    +        query.nonEmpty,
    +        "Cannot add data when there is no query for finding the active socket source")
    +
    +      val sources = query.get.logicalPlan.collect {
    +        case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source
    +      }
    +      if (sources.isEmpty) {
    +        throw new Exception(
    +          "Could not find socket source in the StreamExecution logical plan to add data to")
    +      } else if (sources.size > 1) {
    +        throw new Exception(
    +          "Could not select the socket source in the StreamExecution logical plan as there" +
    +            "are multiple socket sources:\n\t" + sources.mkString("\n\t"))
    +      }
    +      val socketSource = sources.head
    +
    +      assert(serverThread != null && serverThread.port != 0)
    +      val currOffset = socketSource.currentOffset
    +      data.foreach(serverThread.enqueue)
    +
    +      val newOffset = LongOffset(currOffset.offset + data.size)
    +      (socketSource, newOffset)
    +    }
    +
    +    override def toString: String = s"AddSocketData(data = $data)"
    +  }
    +
    +  test("backward compatibility with old path") {
    +    DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
    +      spark.sqlContext.conf).newInstance() match {
    +      case ds: MicroBatchReadSupport =>
    +        assert(ds.isInstanceOf[TextSocketSourceProvider])
    +      case _ =>
    +        throw new IllegalStateException("Could not find socket source")
    +    }
    +  }
    +
    +  test("basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val ref = spark
    +      import ref.implicits._
    +
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
    +        .load()
    +        .as[String]
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswer("hello"),
    +        AddSocketData("world"),
    +        CheckLastBatch("world"),
    +        CheckAnswer("hello", "world"),
    +        StopStream
    +      )
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map(
    +          "host" -> "localhost",
    +          "port" -> serverThread.port.toString,
    +          "includeTimestamp" -> "true"))
    +        .load()
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) ::
    +        StructField("timestamp", TimestampType) :: Nil))
    +
    +      var batch1Stamp: Timestamp = null
    +      var batch2Stamp: Timestamp = null
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "hello")
    +            batch1Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        AddSocketData("world"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "world")
    +            batch2Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        StopStream
    +      )
    +
    +      assert(!batch2Stamp.before(batch1Stamp))
    --- End diff --
    
    I see. Will update it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/847/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87370 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87370/testReport)** for PR 20382 at commit [`068c050`](https://github.com/apache/spark/commit/068c050547a3ae002ac77d0ea2d48e2b82caa049).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87371 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87371/testReport)** for PR 20382 at commit [`647c5cd`](https://github.com/apache/spark/commit/647c5cdd1e3cb4138b597bd429e01308f50468a6).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #86677 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86677/testReport)** for PR 20382 at commit [`9ceb3be`](https://github.com/apache/spark/commit/9ceb3be4a5a7a451ae740cf563792636f879a81b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87667/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Sure, will waiting for others to be merged, thanks @tdas .


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #86671 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86671/testReport)** for PR 20382 at commit [`56c60f3`](https://github.com/apache/spark/commit/56c60f3d9d920cea095e78695544b371435ca6f5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164937540
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -47,130 +48,141 @@ object TextSocketSource {
      * This source will *not* work in production applications due to multiple reasons, including no
      * support for fault recovery and keeping all of the text read in memory forever.
      */
    -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)
    -  extends Source with Logging {
    -
    -  @GuardedBy("this")
    -  private var socket: Socket = null
    -
    -  @GuardedBy("this")
    -  private var readThread: Thread = null
    -
    -  /**
    -   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
    -   * Stored in a ListBuffer to facilitate removing committed batches.
    -   */
    -  @GuardedBy("this")
    -  protected val batches = new ListBuffer[(String, Timestamp)]
    -
    -  @GuardedBy("this")
    -  protected var currentOffset: LongOffset = new LongOffset(-1)
    -
    -  @GuardedBy("this")
    -  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
    +class TextSocketSource(
    +    protected val host: String,
    +    protected val port: Int,
    +    includeTimestamp: Boolean,
    +    sqlContext: SQLContext)
    +  extends Source with TextSocketReader with Logging {
     
       initialize()
     
    -  private def initialize(): Unit = synchronized {
    -    socket = new Socket(host, port)
    -    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
    -    readThread = new Thread(s"TextSocketSource($host, $port)") {
    -      setDaemon(true)
    -
    -      override def run(): Unit = {
    -        try {
    -          while (true) {
    -            val line = reader.readLine()
    -            if (line == null) {
    -              // End of file reached
    -              logWarning(s"Stream closed by $host:$port")
    -              return
    -            }
    -            TextSocketSource.this.synchronized {
    -              val newData = (line,
    -                Timestamp.valueOf(
    -                  TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
    -                )
    -              currentOffset = currentOffset + 1
    -              batches.append(newData)
    -            }
    -          }
    -        } catch {
    -          case e: IOException =>
    -        }
    -      }
    -    }
    -    readThread.start()
    -  }
    -
       /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    -
    -  override def getOffset: Option[Offset] = synchronized {
    -    if (currentOffset.offset == -1) {
    -      None
    -    } else {
    -      Some(currentOffset)
    -    }
    -  }
    +  override def schema: StructType =
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +
    +  override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_))
     
       /** Returns the data that is between the offsets (`start`, `end`]. */
    -  override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
    -    val startOrdinal =
    -      start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
    -    val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
    -
    -    // Internal buffer only holds the batches after lastOffsetCommitted
    -    val rawList = synchronized {
    -      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      batches.slice(sliceStart, sliceEnd)
    -    }
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    +    val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
    +      LongOffset.convert(end).map(_.offset))
     
         val rdd = sqlContext.sparkContext
           .parallelize(rawList)
           .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) }
         sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
       }
     
    -  override def commit(end: Offset): Unit = synchronized {
    +  override def commit(end: Offset): Unit = {
         val newOffset = LongOffset.convert(end).getOrElse(
           sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
             s"originate with an instance of this class")
         )
     
    -    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
    +    commitInternal(newOffset.offset)
    +  }
     
    -    if (offsetDiff < 0) {
    -      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
    -    }
    +  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +}
    +
    +case class TextSocketOffset(offset: Long) extends V2Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with TextSocketReader with Logging {
    +
    +  private var startOffset: TextSocketOffset = _
    +  private var endOffset: TextSocketOffset = _
    +
    +  protected val host: String = options.get("host").get()
    +  protected val port: Int = options.get("port").get().toInt
     
    -    batches.trimStart(offsetDiff)
    -    lastOffsetCommitted = newOffset
    +  initialize()
    +
    +  override def setOffsetRange(start: Optional[V2Offset], end: Optional[V2Offset]): Unit = {
    +    startOffset = start.orElse(TextSocketOffset(-1L)).asInstanceOf[TextSocketOffset]
    +    endOffset = end.orElse(
    +      TextSocketOffset(getOffsetInternal.getOrElse(-1L))).asInstanceOf[TextSocketOffset]
       }
     
    -  /** Stop this source. */
    -  override def stop(): Unit = synchronized {
    -    if (socket != null) {
    -      try {
    -        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    -        // stop the readThread is to close the socket.
    -        socket.close()
    -      } catch {
    -        case e: IOException =>
    -      }
    -      socket = null
    +  override def getStartOffset(): V2Offset = {
    +    Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set"))
    +  }
    +
    +  override def getEndOffset(): V2Offset = {
    +    Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set"))
    +  }
    +
    +  override def deserializeOffset(json: String): V2Offset = {
    +    TextSocketOffset(json.toLong)
    +  }
    +
    +  override def readSchema(): StructType = {
    +    val includeTimestamp = options.getBoolean("includeTimestamp", false)
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +  }
    +
    +  override def createReadTasks(): JList[ReadTask[Row]] = {
    +    val rawList = getBatchInternal(Option(startOffset.offset), Option(endOffset.offset))
    +
    +    assert(SparkSession.getActiveSession.isDefined)
    +    val spark = SparkSession.getActiveSession.get
    +    val numPartitions = spark.sparkContext.defaultParallelism
    +
    +    val slices = Array.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +    rawList.zipWithIndex.foreach { case (r, idx) =>
    +      slices(idx % numPartitions).append(r)
         }
    +
    +    (0 until numPartitions).map { i =>
    +      val slice = slices(i)
    +      new ReadTask[Row] {
    +        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
    +          private var currentIdx = -1
    +
    +          override def next(): Boolean = {
    +            currentIdx += 1
    +            currentIdx < slice.size
    +          }
    +
    +          override def get(): Row = {
    +            Row(slice(currentIdx)._1, slice(currentIdx)._2)
    +          }
    +
    +          override def close(): Unit = {}
    +        }
    +      }
    +    }.toList.asJava
       }
     
    -  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +  override def commit(end: V2Offset): Unit = {
    +    val newOffset = end.asInstanceOf[TextSocketOffset]
    +    commitInternal(newOffset.offset)
    +  }
    +
    +  override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]"
     }
     
    -class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
    -  private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {
    -    Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {
    -      case Success(bool) => bool
    +class TextSocketSourceProvider extends DataSourceV2
    +  with MicroBatchReadSupport with StreamSourceProvider with DataSourceRegister with Logging {
    --- End diff --
    
    aah, i see earlier comments.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/231/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Hi @jose-torres , thanks for your reviewing. I tried both the example you mentioned and simple spark-shell command, I think it works, but the path will always go to V2 `MicroBatchReader` (still need you PR to fallback to V1 Source).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87667 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87667/testReport)** for PR 20382 at commit [`fd890ad`](https://github.com/apache/spark/commit/fd890ad837bb7068c70a27921d67af1c3fe65350).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87819 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87819/testReport)** for PR 20382 at commit [`1073be4`](https://github.com/apache/spark/commit/1073be420b2cc5fd099929fc0215bf8c1be4b6e0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    @jerryshao please address the above comment, then we are good to merge!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163755819
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util._
    +import java.util.{List => JList}
    +import java.util.concurrent.atomic.AtomicLong
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.sources.DataSourceRegister
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
    +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +
    +object TextSocketSourceProviderV2 {
    +  val HOST = "host"
    +  val PORT = "port"
    +  val INCLUDE_TIMESTAMP = "includeTimestamp"
    +  val NUM_PARTITIONS = "numPartitions"
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
    +    StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +class TextSocketSourceProviderV2 extends DataSourceV2
    +    with MicroBatchReadSupport with DataSourceRegister with Logging {
    --- End diff --
    
    I see, thanks for the clarify. Let me change it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87372 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87372/testReport)** for PR 20382 at commit [`f3fc90c`](https://github.com/apache/spark/commit/f3fc90cc94210f313861625b5a8fe6ef754c05bd).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164933597
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---
    @@ -47,130 +48,141 @@ object TextSocketSource {
      * This source will *not* work in production applications due to multiple reasons, including no
      * support for fault recovery and keeping all of the text read in memory forever.
      */
    -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)
    -  extends Source with Logging {
    -
    -  @GuardedBy("this")
    -  private var socket: Socket = null
    -
    -  @GuardedBy("this")
    -  private var readThread: Thread = null
    -
    -  /**
    -   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
    -   * Stored in a ListBuffer to facilitate removing committed batches.
    -   */
    -  @GuardedBy("this")
    -  protected val batches = new ListBuffer[(String, Timestamp)]
    -
    -  @GuardedBy("this")
    -  protected var currentOffset: LongOffset = new LongOffset(-1)
    -
    -  @GuardedBy("this")
    -  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
    +class TextSocketSource(
    +    protected val host: String,
    +    protected val port: Int,
    +    includeTimestamp: Boolean,
    +    sqlContext: SQLContext)
    +  extends Source with TextSocketReader with Logging {
     
       initialize()
     
    -  private def initialize(): Unit = synchronized {
    -    socket = new Socket(host, port)
    -    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
    -    readThread = new Thread(s"TextSocketSource($host, $port)") {
    -      setDaemon(true)
    -
    -      override def run(): Unit = {
    -        try {
    -          while (true) {
    -            val line = reader.readLine()
    -            if (line == null) {
    -              // End of file reached
    -              logWarning(s"Stream closed by $host:$port")
    -              return
    -            }
    -            TextSocketSource.this.synchronized {
    -              val newData = (line,
    -                Timestamp.valueOf(
    -                  TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
    -                )
    -              currentOffset = currentOffset + 1
    -              batches.append(newData)
    -            }
    -          }
    -        } catch {
    -          case e: IOException =>
    -        }
    -      }
    -    }
    -    readThread.start()
    -  }
    -
       /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    -
    -  override def getOffset: Option[Offset] = synchronized {
    -    if (currentOffset.offset == -1) {
    -      None
    -    } else {
    -      Some(currentOffset)
    -    }
    -  }
    +  override def schema: StructType =
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +
    +  override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_))
     
       /** Returns the data that is between the offsets (`start`, `end`]. */
    -  override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
    -    val startOrdinal =
    -      start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
    -    val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
    -
    -    // Internal buffer only holds the batches after lastOffsetCommitted
    -    val rawList = synchronized {
    -      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      batches.slice(sliceStart, sliceEnd)
    -    }
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    +    val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
    +      LongOffset.convert(end).map(_.offset))
     
         val rdd = sqlContext.sparkContext
           .parallelize(rawList)
           .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) }
         sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
       }
     
    -  override def commit(end: Offset): Unit = synchronized {
    +  override def commit(end: Offset): Unit = {
         val newOffset = LongOffset.convert(end).getOrElse(
           sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
             s"originate with an instance of this class")
         )
     
    -    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
    +    commitInternal(newOffset.offset)
    +  }
     
    -    if (offsetDiff < 0) {
    -      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
    -    }
    +  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +}
    +
    +case class TextSocketOffset(offset: Long) extends V2Offset {
    +  override def json(): String = offset.toString
    +}
    +
    +class TextSocketMicroBatchReader(options: DataSourceV2Options)
    +  extends MicroBatchReader with TextSocketReader with Logging {
    +
    +  private var startOffset: TextSocketOffset = _
    +  private var endOffset: TextSocketOffset = _
    +
    +  protected val host: String = options.get("host").get()
    +  protected val port: Int = options.get("port").get().toInt
     
    -    batches.trimStart(offsetDiff)
    -    lastOffsetCommitted = newOffset
    +  initialize()
    +
    +  override def setOffsetRange(start: Optional[V2Offset], end: Optional[V2Offset]): Unit = {
    +    startOffset = start.orElse(TextSocketOffset(-1L)).asInstanceOf[TextSocketOffset]
    +    endOffset = end.orElse(
    +      TextSocketOffset(getOffsetInternal.getOrElse(-1L))).asInstanceOf[TextSocketOffset]
       }
     
    -  /** Stop this source. */
    -  override def stop(): Unit = synchronized {
    -    if (socket != null) {
    -      try {
    -        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    -        // stop the readThread is to close the socket.
    -        socket.close()
    -      } catch {
    -        case e: IOException =>
    -      }
    -      socket = null
    +  override def getStartOffset(): V2Offset = {
    +    Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set"))
    +  }
    +
    +  override def getEndOffset(): V2Offset = {
    +    Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set"))
    +  }
    +
    +  override def deserializeOffset(json: String): V2Offset = {
    +    TextSocketOffset(json.toLong)
    +  }
    +
    +  override def readSchema(): StructType = {
    +    val includeTimestamp = options.getBoolean("includeTimestamp", false)
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
    +  }
    +
    +  override def createReadTasks(): JList[ReadTask[Row]] = {
    +    val rawList = getBatchInternal(Option(startOffset.offset), Option(endOffset.offset))
    +
    +    assert(SparkSession.getActiveSession.isDefined)
    +    val spark = SparkSession.getActiveSession.get
    +    val numPartitions = spark.sparkContext.defaultParallelism
    +
    +    val slices = Array.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +    rawList.zipWithIndex.foreach { case (r, idx) =>
    +      slices(idx % numPartitions).append(r)
         }
    +
    +    (0 until numPartitions).map { i =>
    +      val slice = slices(i)
    +      new ReadTask[Row] {
    +        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
    +          private var currentIdx = -1
    +
    +          override def next(): Boolean = {
    +            currentIdx += 1
    +            currentIdx < slice.size
    +          }
    +
    +          override def get(): Row = {
    +            Row(slice(currentIdx)._1, slice(currentIdx)._2)
    +          }
    +
    +          override def close(): Unit = {}
    +        }
    +      }
    +    }.toList.asJava
       }
     
    -  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +  override def commit(end: V2Offset): Unit = {
    +    val newOffset = end.asInstanceOf[TextSocketOffset]
    +    commitInternal(newOffset.offset)
    +  }
    +
    +  override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]"
     }
     
    -class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
    -  private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {
    -    Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {
    -      case Success(bool) => bool
    +class TextSocketSourceProvider extends DataSourceV2
    +  with MicroBatchReadSupport with StreamSourceProvider with DataSourceRegister with Logging {
    --- End diff --
    
    If I don't misunderstand @jose-torres 's intention, basically he wanted this socket source to work also in V1 code path.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Sorry @tdas for the delay. I'm working on this, will push new changes soon.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r171506698
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,300 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.IOException
    +import java.net.InetSocketAddress
    +import java.nio.ByteBuffer
    +import java.nio.channels.ServerSocketChannel
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.execution.datasources.DataSource
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  case class AddSocketData(data: String*) extends AddData {
    +    override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
    +      require(
    +        query.nonEmpty,
    +        "Cannot add data when there is no query for finding the active socket source")
    +
    +      val sources = query.get.logicalPlan.collect {
    +        case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source
    +      }
    +      if (sources.isEmpty) {
    +        throw new Exception(
    +          "Could not find socket source in the StreamExecution logical plan to add data to")
    +      } else if (sources.size > 1) {
    +        throw new Exception(
    +          "Could not select the socket source in the StreamExecution logical plan as there" +
    +            "are multiple socket sources:\n\t" + sources.mkString("\n\t"))
    +      }
    +      val socketSource = sources.head
    +
    +      assert(serverThread != null && serverThread.port != 0)
    +      val currOffset = socketSource.currentOffset
    +      data.foreach(serverThread.enqueue)
    +
    +      val newOffset = LongOffset(currOffset.offset + data.size)
    +      (socketSource, newOffset)
    +    }
    +
    +    override def toString: String = s"AddSocketData(data = $data)"
    +  }
    +
    +  test("backward compatibility with old path") {
    +    DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
    +      spark.sqlContext.conf).newInstance() match {
    +      case ds: MicroBatchReadSupport =>
    +        assert(ds.isInstanceOf[TextSocketSourceProvider])
    +      case _ =>
    +        throw new IllegalStateException("Could not find socket source")
    +    }
    +  }
    +
    +  test("basic usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val ref = spark
    +      import ref.implicits._
    +
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
    +        .load()
    +        .as[String]
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) :: Nil))
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswer("hello"),
    +        AddSocketData("world"),
    +        CheckLastBatch("world"),
    +        CheckAnswer("hello", "world"),
    +        StopStream
    +      )
    +    }
    +  }
    +
    +  test("timestamped usage") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      val socket = spark
    +        .readStream
    +        .format("socket")
    +        .options(Map(
    +          "host" -> "localhost",
    +          "port" -> serverThread.port.toString,
    +          "includeTimestamp" -> "true"))
    +        .load()
    +
    +      assert(socket.schema === StructType(StructField("value", StringType) ::
    +        StructField("timestamp", TimestampType) :: Nil))
    +
    +      var batch1Stamp: Timestamp = null
    +      var batch2Stamp: Timestamp = null
    +
    +      testStream(socket)(
    +        StartStream(),
    +        AddSocketData("hello"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "hello")
    +            batch1Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        AddSocketData("world"),
    +        CheckAnswerRowsByFunc(
    +          rows => {
    +            assert(rows.size === 1)
    +            assert(rows.head.getAs[String](0) === "world")
    +            batch2Stamp = rows.head.getAs[Timestamp](1)
    +          },
    +          true),
    +        StopStream
    +      )
    +
    +      assert(!batch2Stamp.before(batch1Stamp))
    +    }
    +  }
    +
    +  test("params not given") {
    +    val provider = new TextSocketSourceProvider
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map.empty[String, String].asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("host" -> "localhost").asJava))
    +    }
    +    intercept[AnalysisException] {
    +      provider.createMicroBatchReader(Optional.empty(), "",
    +        new DataSourceOptions(Map("port" -> "1234").asJava))
    +    }
    +  }
    +
    +  test("non-boolean includeTimestamp") {
    +    val provider = new TextSocketSourceProvider
    +    val params = Map("host" -> "localhost", "port" -> "1234", "includeTimestamp" -> "fasle")
    +    intercept[AnalysisException] {
    +      val a = new DataSourceOptions(params.asJava)
    +      provider.createMicroBatchReader(Optional.empty(), "", a)
    +    }
    +  }
    +
    +  test("user-specified schema given") {
    +    val provider = new TextSocketSourceProvider
    +    val userSpecifiedSchema = StructType(
    +      StructField("name", StringType) ::
    +      StructField("area", StringType) :: Nil)
    +    val params = Map("host" -> "localhost", "port" -> "1234")
    +    val exception = intercept[AnalysisException] {
    +      provider.createMicroBatchReader(
    +        Optional.of(userSpecifiedSchema), "", new DataSourceOptions(params.asJava))
    +    }
    +    assert(exception.getMessage.contains(
    +      "socket source does not support a user-specified schema"))
    +  }
    +
    +  test("no server up") {
    +    val provider = new TextSocketSourceProvider
    +    val parameters = Map("host" -> "localhost", "port" -> "0")
    +    intercept[IOException] {
    +      batchReader = provider.createMicroBatchReader(
    +        Optional.empty(), "", new DataSourceOptions(parameters.asJava))
    +    }
    --- End diff --
    
    In my local test, the assert message is `Can't assign requested address`, but on Jenkins, it is `Connection refused`. The difference might be due to different OS/native method.
     
    I think it would be better to not check the message due to different outputs. Even if we change to follow Jenkins way, it still fails in my local Mac. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r167128485
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala ---
    @@ -164,54 +213,43 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
         }
       }
     
    -  override def toString: String = s"TextSocketSource[host: $host, port: $port]"
    +  override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]"
     }
     
    -class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
    -  private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {
    -    Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {
    -      case Success(bool) => bool
    -      case Failure(_) =>
    -        throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"")
    -    }
    -  }
    +class TextSocketSourceProvider extends DataSourceV2
    +  with MicroBatchReadSupport with DataSourceRegister with Logging {
     
    -  /** Returns the name and schema of the source that can be used to continually read data. */
    -  override def sourceSchema(
    -      sqlContext: SQLContext,
    -      schema: Option[StructType],
    -      providerName: String,
    -      parameters: Map[String, String]): (String, StructType) = {
    +  private def checkParameters(params: Map[String, String]): Unit = {
         logWarning("The socket source should not be used for production applications! " +
           "It does not support recovery.")
    -    if (!parameters.contains("host")) {
    +    if (!params.contains("host")) {
           throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
         }
    -    if (!parameters.contains("port")) {
    +    if (!params.contains("port")) {
           throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
         }
    -    if (schema.nonEmpty) {
    -      throw new AnalysisException("The socket source does not support a user-specified schema.")
    +    Try {
    +      params.get("includeTimestamp")
    +        .orElse(params.get("includetimestamp"))
    +        .getOrElse("false")
    +        .toBoolean
    +    } match {
    +      case Success(_) =>
    +      case Failure(_) =>
    +        throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"")
         }
    -
    -    val sourceSchema =
    -      if (parseIncludeTimestamp(parameters)) {
    -        TextSocketSource.SCHEMA_TIMESTAMP
    -      } else {
    -        TextSocketSource.SCHEMA_REGULAR
    -      }
    -    ("textSocket", sourceSchema)
       }
     
    -  override def createSource(
    -      sqlContext: SQLContext,
    -      metadataPath: String,
    -      schema: Option[StructType],
    -      providerName: String,
    -      parameters: Map[String, String]): Source = {
    -    val host = parameters("host")
    -    val port = parameters("port").toInt
    -    new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext)
    +  override def createMicroBatchReader(
    +      schema: Optional[StructType],
    +      checkpointLocation: String,
    +      options: DataSourceOptions): MicroBatchReader = {
    +    checkParameters(options.asMap().asScala.toMap)
    --- End diff --
    
    why not check it as DataSourceOptions (which is known to be case-insensitive) rather than a map which raises questions about case sensitivity?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #86640 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86640/testReport)** for PR 20382 at commit [`56c60f3`](https://github.com/apache/spark/commit/56c60f3d9d920cea095e78695544b371435ca6f5).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r163487603
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala ---
    @@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging {
         println("-------------------------------------------")
         // scalastyle:off println
         spark
    -      .createDataFrame(spark.sparkContext.parallelize(rows), schema)
    +      .createDataFrame(rows.toList.asJava, schema)
    --- End diff --
    
    Change here to avoid triggering new distributed job.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87659/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87660 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87660/testReport)** for PR 20382 at commit [`fd890ad`](https://github.com/apache/spark/commit/fd890ad837bb7068c70a27921d67af1c3fe65350).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87202 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87202/testReport)** for PR 20382 at commit [`874c91c`](https://github.com/apache/spark/commit/874c91c41942972cabb85be175f929fc62e74af7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86677/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/839/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #86671 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86671/testReport)** for PR 20382 at commit [`56c60f3`](https://github.com/apache/spark/commit/56c60f3d9d920cea095e78695544b371435ca6f5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    jenkins test this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87831 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87831/testReport)** for PR 20382 at commit [`762f1da`](https://github.com/apache/spark/commit/762f1da952eae99fc7b377a08267c0d4cdaf00ee).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87203 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87203/testReport)** for PR 20382 at commit [`874c91c`](https://github.com/apache/spark/commit/874c91c41942972cabb85be175f929fc62e74af7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1047/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    I am holding off further comments on this PR until the major change of eliminating v1 Source is done. That would cause significant refactoring (including the fact that the common trait wont be needed). 
    
    BTW, I strongly suggest moving the socket code to execution.streaming.sources, like other v2 sources.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/261/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r168138470
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import java.io.{IOException, OutputStreamWriter}
    +import java.net.ServerSocket
    +import java.sql.Timestamp
    +import java.util.Optional
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.scalatest.BeforeAndAfterEach
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{AnalysisException, Row}
    +import org.apache.spark.sql.execution.streaming.LongOffset
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +
    +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach {
    +
    +  override def afterEach() {
    +    sqlContext.streams.active.foreach(_.stop())
    +    if (serverThread != null) {
    +      serverThread.interrupt()
    +      serverThread.join()
    +      serverThread = null
    +    }
    +    if (batchReader != null) {
    +      batchReader.stop()
    +      batchReader = null
    +    }
    +  }
    +
    +  private var serverThread: ServerThread = null
    +  private var batchReader: MicroBatchReader = null
    +
    +  test("V2 basic usage") {
    --- End diff --
    
    These updated tests are getting more complicated with the direct calling of low-level data source APIs. Can you convert these tests to the more highlevel tests like Kafka? 
    Well if it gets too complicated to make it work with `testStream` then you can simply use `query.processAllAvailable`. Then we wont have to worry about changing APIs any more.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    **[Test build #87383 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87383/testReport)** for PR 20382 at commit [`f3fc90c`](https://github.com/apache/spark/commit/f3fc90cc94210f313861625b5a8fe6ef754c05bd).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Sure, I will do it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r168363424
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---
    @@ -583,7 +585,8 @@ object DataSource extends Logging {
           "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
           "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
           "org.apache.spark.ml.source.libsvm" -> libsvm,
    -      "com.databricks.spark.csv" -> csv
    +      "com.databricks.spark.csv" -> csv,
    +      "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket
    --- End diff --
    
    please add a test for this!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20382
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org