You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by arunmahadevan <gi...@git.apache.org> on 2018/04/30 21:51:00 UTC

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

GitHub user arunmahadevan opened a pull request:

    https://github.com/apache/spark/pull/21199

    [SPARK-24127][SS] Continuous text socket source

    ## What changes were proposed in this pull request?
    
    Support for text socket stream in spark structured streaming "continuous" mode. This is roughly based on the idea of ContinuousMemoryStream where the executor queries the data from driver over an RPC endpoint.
    
    This makes it possible to create Structured streaming continuous pipeline to ingest data via "nc" and run examples.
    
    ## How was this patch tested?
    
    Ran spark examples in structured streaming continuous mode.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/arunmahadevan/spark SPARK-24127

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21199.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21199
    
----
commit f7807fac2c39185893f78c7968eff8e9cafce991
Author: Arun Mahadevan <ar...@...>
Date:   2018-04-30T17:04:23Z

    SPARK-24127: Continuous text socket source

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    @jose-torres and @HeartSaVioR, is it good to go?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #94148 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94148/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    add to whitelist


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r206385714
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketOffset(Serialization.read[List[Int]](json))
    +  }
    +
    +  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
    +    this.startOffset = offset
    +      .orElse(TextSocketOffset(List.fill(numPartitions)(0)))
    +      .asInstanceOf[TextSocketOffset]
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  override def getStartOffset: Offset = startOffset
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      TextSocketContinuousReader.SCHEMA_TIMESTAMP
    +    } else {
    +      TextSocketContinuousReader.SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def planRowInputPartitions(): JList[InputPartition[Row]] = {
    +
    +    val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
    +    endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
    +
    +    val offsets = startOffset match {
    +      case off: TextSocketOffset => off.offsets
    +      case off =>
    +        throw new IllegalArgumentException(
    +          s"invalid offset type ${off.getClass} for TextSocketContinuousReader")
    +    }
    +
    +    if (offsets.size != numPartitions) {
    +      throw new IllegalArgumentException(
    +        s"The previous run contained ${offsets.size} partitions, but" +
    +          s" $numPartitions partitions are currently configured. The numPartitions option" +
    +          " cannot be changed.")
    +    }
    +
    +    startOffset.offsets.zipWithIndex.map {
    +      case (offset, i) =>
    +        TextSocketContinuousInputPartition(
    +          endpointName, i, offset, includeTimestamp): InputPartition[Row]
    +    }.asJava
    +
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val endOffset = end.asInstanceOf[TextSocketOffset]
    +    endOffset.offsets.zipWithIndex.foreach {
    +      case (offset, partition) =>
    +        val max = startOffset.offsets(partition) + buckets(partition).size
    +        if (offset > max) {
    +          throw new IllegalStateException("Invalid offset " + offset + " to commit" +
    +          " for partition " + partition + ". Max valid offset: " + max)
    +        }
    +        val n = offset - startOffset.offsets(partition)
    +        buckets(partition).trimStart(n)
    +    }
    +    startOffset = endOffset
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  /** Stop this source. */
    +  override def stop(): Unit = synchronized {
    +    if (socket != null) {
    +      try {
    +        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    +        // stop the readThread is to close the socket.
    +        socket.close()
    +      } catch {
    +        case e: IOException =>
    +      }
    +      socket = null
    +    }
    +    if (endpointRef != null) recordEndpoint.rpcEnv.stop(endpointRef)
    +  }
    +
    +  private def initialize(): Unit = synchronized {
    +    socket = new Socket(host, port)
    --- End diff --
    
    Ideally we could deduplicate the code between continuous / micro-batch, via modifying read thread to receive a handler for new line and let each reader handles the new line accordingly with proper lock. With this change we can use same read thread for both continuous and micro-batch reader.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #94411 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94411/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90603 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90603/testReport)** for PR 21199 at commit [`b3a42f0`](https://github.com/apache/spark/commit/b3a42f08cba85b9bec11aaa3f75de298aa869204).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds the following public classes _(experimental)_:
      * `case class ContinuousRecordPartitionOffset(partitionId: Int, offset: Int) extends PartitionOffset`
      * `case class GetRecord(offset: ContinuousRecordPartitionOffset)`
      * `class ContinuousRecordEndpoint(buckets: Seq[Seq[Any]], lock: Object)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r206371107
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    --- End diff --
    
    I'd rather make it safer via either one of two approaches: 
    
    1. assert partition offsets has all partition ids, 0 ~ numPartitions - 1
    2. add partition id in list element of TextSocketOffset as RateStreamContinuousReader and RateStreamOffset did
    
    Personally I prefer option 2, but either is fine for me. Not sure which is more preferred for committers/Spark community.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #93801 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93801/testReport)** for PR 21199 at commit [`a069d01`](https://github.com/apache/spark/commit/a069d01a485b6066650b75e0cb9bfaf63710a9e2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90744/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #93797 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93797/testReport)** for PR 21199 at commit [`76512d8`](https://github.com/apache/spark/commit/76512d8ad86ea9e7301db0a9ac9fce12a66d3f40).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r206386593
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketOffset(Serialization.read[List[Int]](json))
    +  }
    +
    +  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
    +    this.startOffset = offset
    +      .orElse(TextSocketOffset(List.fill(numPartitions)(0)))
    +      .asInstanceOf[TextSocketOffset]
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  override def getStartOffset: Offset = startOffset
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      TextSocketContinuousReader.SCHEMA_TIMESTAMP
    +    } else {
    +      TextSocketContinuousReader.SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def planRowInputPartitions(): JList[InputPartition[Row]] = {
    +
    +    val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
    +    endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
    +
    +    val offsets = startOffset match {
    +      case off: TextSocketOffset => off.offsets
    +      case off =>
    +        throw new IllegalArgumentException(
    +          s"invalid offset type ${off.getClass} for TextSocketContinuousReader")
    +    }
    +
    +    if (offsets.size != numPartitions) {
    +      throw new IllegalArgumentException(
    +        s"The previous run contained ${offsets.size} partitions, but" +
    +          s" $numPartitions partitions are currently configured. The numPartitions option" +
    +          " cannot be changed.")
    +    }
    +
    +    startOffset.offsets.zipWithIndex.map {
    +      case (offset, i) =>
    +        TextSocketContinuousInputPartition(
    +          endpointName, i, offset, includeTimestamp): InputPartition[Row]
    +    }.asJava
    +
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val endOffset = end.asInstanceOf[TextSocketOffset]
    +    endOffset.offsets.zipWithIndex.foreach {
    +      case (offset, partition) =>
    +        val max = startOffset.offsets(partition) + buckets(partition).size
    +        if (offset > max) {
    +          throw new IllegalStateException("Invalid offset " + offset + " to commit" +
    +          " for partition " + partition + ". Max valid offset: " + max)
    +        }
    +        val n = offset - startOffset.offsets(partition)
    +        buckets(partition).trimStart(n)
    +    }
    +    startOffset = endOffset
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  /** Stop this source. */
    +  override def stop(): Unit = synchronized {
    +    if (socket != null) {
    +      try {
    +        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    +        // stop the readThread is to close the socket.
    +        socket.close()
    +      } catch {
    +        case e: IOException =>
    +      }
    +      socket = null
    +    }
    +    if (endpointRef != null) recordEndpoint.rpcEnv.stop(endpointRef)
    +  }
    +
    +  private def initialize(): Unit = synchronized {
    +    socket = new Socket(host, port)
    +    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
    +    // Thread continuously reads from a socket and inserts data into buckets
    +    readThread = new Thread(s"TextSocketContinuousReader($host, $port)") {
    +      setDaemon(true)
    +
    +      override def run(): Unit = {
    +        try {
    +          while (true) {
    +            val line = reader.readLine()
    +            if (line == null) {
    +              // End of file reached
    +              logWarning(s"Stream closed by $host:$port")
    +              return
    +            }
    +            TextSocketContinuousReader.this.synchronized {
    +              currentOffset += 1
    +              val newData = (line,
    +                Timestamp.valueOf(
    +                  TextSocketContinuousReader.DATE_FORMAT.format(Calendar.getInstance().getTime()))
    +              )
    +              buckets(currentOffset % numPartitions) += newData
    +            }
    +          }
    +        } catch {
    +          case e: IOException =>
    +        }
    +      }
    +    }
    +
    +    readThread.start()
    +  }
    +
    +  override def toString: String = s"TextSocketContinuousReader[host: $host, port: $port]"
    +
    +  private def includeTimestamp: Boolean = options.getBoolean("includeTimestamp", false)
    +
    +}
    +
    +/**
    + * Continuous text socket input partition.
    + */
    +case class TextSocketContinuousInputPartition(
    +    driverEndpointName: String,
    +    partitionId: Int,
    +    startOffset: Int,
    +    includeTimestamp: Boolean)
    +extends InputPartition[Row] {
    +
    +  override def createPartitionReader(): InputPartitionReader[Row] =
    +    new TextSocketContinuousInputPartitionReader(driverEndpointName, partitionId, startOffset,
    +      includeTimestamp)
    +}
    +
    +/**
    + * Continuous text socket input partition reader.
    + *
    + * Polls the driver endpoint for new records.
    + */
    +class TextSocketContinuousInputPartitionReader(
    +    driverEndpointName: String,
    +    partitionId: Int,
    +    startOffset: Int,
    +    includeTimestamp: Boolean)
    +  extends ContinuousInputPartitionReader[Row] {
    +
    +  private val endpoint = RpcUtils.makeDriverRef(
    +    driverEndpointName,
    +    SparkEnv.get.conf,
    +    SparkEnv.get.rpcEnv)
    +
    +  private var currentOffset = startOffset
    +  private var current: Option[Row] = None
    +
    +  override def next(): Boolean = {
    +    try {
    +      current = getRecord
    +      while (current.isEmpty) {
    +        Thread.sleep(100)
    +        current = getRecord
    +      }
    +      currentOffset += 1
    +    } catch {
    +      case _: InterruptedException =>
    +        // Someone's trying to end the task; just let them.
    +        return false
    +    }
    +    true
    +  }
    +
    +  override def get(): Row = {
    +    current.get
    +  }
    +
    +  override def close(): Unit = {}
    +
    +  override def getOffset: PartitionOffset =
    +    ContinuousRecordPartitionOffset(partitionId, currentOffset)
    +
    +  private def getRecord: Option[Row] =
    +    endpoint.askSync[Option[Row]](GetRecord(
    +      ContinuousRecordPartitionOffset(partitionId, currentOffset)))
    +      .map(rec => {
    --- End diff --
    
    nit: according to style guide, this may need to be written as follow
    
    ```
    .map { rec => 
      if (includeTimestamp) {
    ...
    ```
    
    or even
    
    ```
    ContinuousRecordPartitionOffset(partitionId, currentOffset))).map { rec => 
      if (includeTimestamp) {
    ...
    ```
    
    https://github.com/databricks/scala-style-guide#anonymous-methods


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    ping @tdas @jose-torres


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    I was thinking if it is too overkill to receive data in the driver side and publish them to the executors via RPC? This might give user a wrong impression that data should be received in the driver side and published to the executors again.
    
    Just my two cents.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    ping @jerryshao @tdas @jose-torres @HeartSaVioR for inputs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #93921 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93921/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #93921 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93921/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #93801 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93801/testReport)** for PR 21199 at commit [`a069d01`](https://github.com/apache/spark/commit/a069d01a485b6066650b75e0cb9bfaf63710a9e2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90755 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90755/testReport)** for PR 21199 at commit [`68c5eed`](https://github.com/apache/spark/commit/68c5eed7c6d32db8a7a469a1970e6fa731a91c70).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91615/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r206388213
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
         }
       }
     
    +  test("continuous data") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val reader = new TextSocketContinuousReader(
    +      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
    +        "port" -> serverThread.port.toString).asJava))
    +    reader.setStartOffset(Optional.empty())
    +    val tasks = reader.planRowInputPartitions()
    +    assert(tasks.size == 2)
    +
    +    val numRecords = 10
    +    val data = scala.collection.mutable.ListBuffer[Int]()
    +    val offsets = scala.collection.mutable.ListBuffer[Int]()
    +    import org.scalatest.time.SpanSugar._
    +    failAfter(5 seconds) {
    +      // inject rows, read and check the data and offsets
    +      for (i <- 0 until numRecords) {
    +        serverThread.enqueue(i.toString)
    +      }
    +      tasks.asScala.foreach {
    +        case t: TextSocketContinuousInputPartition =>
    +          val r = t.createPartitionReader().asInstanceOf[TextSocketContinuousInputPartitionReader]
    +          for (i <- 0 until numRecords / 2) {
    +            r.next()
    +            offsets.append(r.getOffset().asInstanceOf[ContinuousRecordPartitionOffset].offset)
    +            data.append(r.get().getString(0).toInt)
    +            if (i == 2) {
    +              commitOffset(t.partitionId, i + 1)
    +            }
    +          }
    +          assert(offsets.toSeq == Range.inclusive(1, 5))
    +          assert(data.toSeq == Range(t.partitionId, 10, 2))
    +          offsets.clear()
    +          data.clear()
    +        case _ => throw new IllegalStateException("Unexpected task type")
    +      }
    +      assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == List(3, 3))
    +      reader.commit(TextSocketOffset(List(5, 5)))
    +      assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == List(5, 5))
    +    }
    +
    +    def commitOffset(partition: Int, offset: Int): Unit = {
    +      val offsetsToCommit = reader.getStartOffset.asInstanceOf[TextSocketOffset]
    +        .offsets.updated(partition, offset)
    +      reader.commit(TextSocketOffset(offsetsToCommit))
    +      assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == offsetsToCommit)
    +    }
    +  }
    +
    +  test("continuous data - invalid commit") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val reader = new TextSocketContinuousReader(
    +      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
    +        "port" -> serverThread.port.toString).asJava))
    +    reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5))))
    +    // ok to commit same offset
    +    reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5))))
    +    assertThrows[IllegalStateException] {
    +      reader.commit(TextSocketOffset(List(6, 6)))
    +    }
    +  }
    +
    +  test("continuous data with timestamp") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val reader = new TextSocketContinuousReader(
    +      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
    +        "includeTimestamp" -> "true",
    +        "port" -> serverThread.port.toString).asJava))
    +    reader.setStartOffset(Optional.empty())
    +    val tasks = reader.planRowInputPartitions()
    +    assert(tasks.size == 2)
    +
    +    val numRecords = 4
    +    import org.apache.spark.sql.Row
    --- End diff --
    
    Looks like unused import


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r207078197
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    --- End diff --
    
    The companion object can be shared. But overall I guess we need to come up better interfaces such that the micro and continuous sources could share more code. I would investigate this out of the scope of this PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r207078242
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketOffset(Serialization.read[List[Int]](json))
    +  }
    +
    +  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
    +    this.startOffset = offset
    +      .orElse(TextSocketOffset(List.fill(numPartitions)(0)))
    +      .asInstanceOf[TextSocketOffset]
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  override def getStartOffset: Offset = startOffset
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      TextSocketContinuousReader.SCHEMA_TIMESTAMP
    +    } else {
    +      TextSocketContinuousReader.SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def planRowInputPartitions(): JList[InputPartition[Row]] = {
    +
    +    val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
    +    endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
    +
    +    val offsets = startOffset match {
    +      case off: TextSocketOffset => off.offsets
    +      case off =>
    +        throw new IllegalArgumentException(
    +          s"invalid offset type ${off.getClass} for TextSocketContinuousReader")
    +    }
    +
    +    if (offsets.size != numPartitions) {
    +      throw new IllegalArgumentException(
    +        s"The previous run contained ${offsets.size} partitions, but" +
    +          s" $numPartitions partitions are currently configured. The numPartitions option" +
    +          " cannot be changed.")
    +    }
    +
    +    startOffset.offsets.zipWithIndex.map {
    +      case (offset, i) =>
    +        TextSocketContinuousInputPartition(
    +          endpointName, i, offset, includeTimestamp): InputPartition[Row]
    +    }.asJava
    +
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val endOffset = end.asInstanceOf[TextSocketOffset]
    --- End diff --
    
    Will add


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #94321 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94321/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    @HyukjinKwon this has been open for a while, would you mind taking this forward?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #93939 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93939/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r207096219
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    --- End diff --
    
    Yeah, agreed. I'm OK if same implications are used in other places.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94321/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90744 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90744/testReport)** for PR 21199 at commit [`242bcdb`](https://github.com/apache/spark/commit/242bcdbacad2b3133447ba7e7cef100567ed3fd0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90603/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r189062212
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousRecordEndpoint.scala ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.Row
    +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
    +
    +case class ContinuousRecordPartitionOffset(partitionId: Int, offset: Int) extends PartitionOffset
    +case class GetRecord(offset: ContinuousRecordPartitionOffset)
    +
    +/**
    + * A RPC end point for continuous readers to poll for
    + * records from the driver.
    + *
    + * @param buckets the data buckets
    --- End diff --
    
    nit: probably better to elaborate on what these are for, Seq[Seq[Any]] and Object aren't very informative types


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94411/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #91615 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91615/testReport)** for PR 21199 at commit [`68c5eed`](https://github.com/apache/spark/commit/68c5eed7c6d32db8a7a469a1970e6fa731a91c70).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r206388466
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
         }
       }
     
    +  test("continuous data") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val reader = new TextSocketContinuousReader(
    +      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
    +        "port" -> serverThread.port.toString).asJava))
    +    reader.setStartOffset(Optional.empty())
    +    val tasks = reader.planRowInputPartitions()
    +    assert(tasks.size == 2)
    +
    +    val numRecords = 10
    +    val data = scala.collection.mutable.ListBuffer[Int]()
    +    val offsets = scala.collection.mutable.ListBuffer[Int]()
    +    import org.scalatest.time.SpanSugar._
    +    failAfter(5 seconds) {
    +      // inject rows, read and check the data and offsets
    --- End diff --
    
    Maybe adding more line comments in code block would help understanding the test code easier, like intentionally committing in the middle of range, etc.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93797/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    The change looks broadly good (and important) to me. I'll defer to @HeartSaVioR wrt the in-depth review; let me know if there are any specific parts I should to take a look at.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r207078232
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    --- End diff --
    
    There is an assertion above `assert(offsets.length == numPartitions)` (option 1). RateSource also uses similar validation. I am not sure if adding the index adds any value here since socket source does not support recovery. Even in Rate source the partition values stored are `1...numPartitions-1` and this can already be inferred by the index of the offset in the array.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90755/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #94011 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94011/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #93797 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93797/testReport)** for PR 21199 at commit [`76512d8`](https://github.com/apache/spark/commit/76512d8ad86ea9e7301db0a9ac9fce12a66d3f40).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90606 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90606/testReport)** for PR 21199 at commit [`b962c3d`](https://github.com/apache/spark/commit/b962c3dbd1715b2d4fa03e65731e36697cf37ff1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    yes, this similar to the micro batch socket source where the driver opens a single socket connection to read data from "nc". I would expect this pattern to be used only for debug and test sources and not so much for the real ones. We can add some code comments to clarify this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93939/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #94411 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94411/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    A gentle ping for review @jose-torres , @jerryshao , @xuanyuanking


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    @arunmahadevan 
    Sorry I forgot to review this so far. Could you fix merge conflicts? I'd pull the code to the local and review since the code diff is not small.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    @HeartSaVioR , Addressed your comments. Let me know if I missed something. Also rebased and had to change more code to use the new interfaces. 
    
    I hope if we can speed up the review cycles in general than leaving PRs to hibernation for a while and then the developer will loose the context and other things would have changed in the meanwhile.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r186764630
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReader.GetRecord
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +
    +  case class GetRecord(offset: TextSocketPartitionOffset)
    +
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new RecordEndpoint()
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[TextSocketPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketOffset(Serialization.read[List[Int]](json))
    +  }
    +
    +  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
    +    this.startOffset = offset
    +      .orElse(TextSocketOffset(List.fill(numPartitions)(0)))
    +      .asInstanceOf[TextSocketOffset]
    +  }
    +
    +  override def getStartOffset: Offset = startOffset
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      TextSocketContinuousReader.SCHEMA_TIMESTAMP
    +    } else {
    +      TextSocketContinuousReader.SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
    +
    +    val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
    +    endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
    +
    +    val offsets = startOffset match {
    +      case off: TextSocketOffset => off.offsets
    +      case off =>
    +        throw new IllegalArgumentException(
    +          s"invalid offset type ${off.getClass} for TextSocketContinuousReader")
    +    }
    +
    +    if (offsets.size != numPartitions) {
    +      throw new IllegalArgumentException(
    +        s"The previous run contained ${offsets.size} partitions, but" +
    +          s" $numPartitions partitions are currently configured. The numPartitions option" +
    +          " cannot be changed.")
    +    }
    +
    +    startOffset.offsets.zipWithIndex.map {
    +      case (offset, i) =>
    +        TextSocketContinuousDataReaderFactory(
    +          endpointName, i, offset): DataReaderFactory[Row]
    +    }.asJava
    +
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val endOffset = end.asInstanceOf[TextSocketOffset]
    +    endOffset.offsets.zipWithIndex.foreach {
    +      case (offset, partition) =>
    +        buckets(partition).trimStart(offset - startOffset.offsets(partition))
    +    }
    +    startOffset = endOffset
    +  }
    +
    +  /** Stop this source. */
    +  override def stop(): Unit = synchronized {
    +    if (socket != null) {
    +      try {
    +        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    +        // stop the readThread is to close the socket.
    +        socket.close()
    +      } catch {
    +        case e: IOException =>
    +      }
    +      socket = null
    +    }
    +    if (endpointRef != null) recordEndpoint.rpcEnv.stop(endpointRef)
    +  }
    +
    +  private def initialize(): Unit = synchronized {
    +    socket = new Socket(host, port)
    +    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
    +    // Thread continuously reads from a socket and inserts data into buckets
    +    readThread = new Thread(s"TextSocketContinuousReader($host, $port)") {
    +      setDaemon(true)
    +
    +      override def run(): Unit = {
    +        try {
    +          while (true) {
    +            val line = reader.readLine()
    +            if (line == null) {
    +              // End of file reached
    +              logWarning(s"Stream closed by $host:$port")
    +              return
    +            }
    +            TextSocketContinuousReader.this.synchronized {
    +              currentOffset += 1
    +              val newData = (line,
    +                Timestamp.valueOf(
    +                  TextSocketContinuousReader.DATE_FORMAT.format(Calendar.getInstance().getTime()))
    +              )
    +              buckets(currentOffset % numPartitions) += newData
    +            }
    +          }
    +        } catch {
    +          case e: IOException =>
    +        }
    +      }
    +    }
    +
    +    readThread.start()
    +  }
    +
    +  /**
    +   * Endpoint for executors to poll for records.
    +   */
    +  private class RecordEndpoint extends ThreadSafeRpcEndpoint {
    +
    +    override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
    +
    +    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    +      case GetRecord(TextSocketPartitionOffset(partition, index)) =>
    +        TextSocketContinuousReader.this.synchronized {
    +          val bufIndex = index - startOffset.offsets(partition)
    +          val record = if (buckets(partition).size <= bufIndex) {
    +            None
    +          } else {
    +            Some(buckets(partition)(bufIndex))
    +          }
    +          context.reply(
    +            record.map(r => if (includeTimestamp) Row(r) else Row(r._1)
    +            )
    +          )
    --- End diff --
    
    ```
    context.reply(record.map(r => if (includeTimestamp) Row(r) else Row(r._1)))
    ```
    Just in one line is ok I think.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #93939 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93939/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r206357959
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    --- End diff --
    
    While the values are good to be placed with companion object, it looks like redundant to have them in both micro-batch and continuous, so might be better to have common object to place this. 
    
    We may need to find more spots to deduplicate between micro-batch and continuous for socket.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r207237894
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketOffset(Serialization.read[List[Int]](json))
    +  }
    +
    +  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
    +    this.startOffset = offset
    +      .orElse(TextSocketOffset(List.fill(numPartitions)(0)))
    +      .asInstanceOf[TextSocketOffset]
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  override def getStartOffset: Offset = startOffset
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      TextSocketContinuousReader.SCHEMA_TIMESTAMP
    +    } else {
    +      TextSocketContinuousReader.SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def planRowInputPartitions(): JList[InputPartition[Row]] = {
    +
    +    val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
    +    endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
    +
    +    val offsets = startOffset match {
    +      case off: TextSocketOffset => off.offsets
    +      case off =>
    +        throw new IllegalArgumentException(
    +          s"invalid offset type ${off.getClass} for TextSocketContinuousReader")
    +    }
    +
    +    if (offsets.size != numPartitions) {
    +      throw new IllegalArgumentException(
    +        s"The previous run contained ${offsets.size} partitions, but" +
    +          s" $numPartitions partitions are currently configured. The numPartitions option" +
    +          " cannot be changed.")
    +    }
    +
    +    startOffset.offsets.zipWithIndex.map {
    +      case (offset, i) =>
    +        TextSocketContinuousInputPartition(
    +          endpointName, i, offset, includeTimestamp): InputPartition[Row]
    +    }.asJava
    +
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val endOffset = end.asInstanceOf[TextSocketOffset]
    +    endOffset.offsets.zipWithIndex.foreach {
    +      case (offset, partition) =>
    +        val max = startOffset.offsets(partition) + buckets(partition).size
    +        if (offset > max) {
    +          throw new IllegalStateException("Invalid offset " + offset + " to commit" +
    +          " for partition " + partition + ". Max valid offset: " + max)
    +        }
    +        val n = offset - startOffset.offsets(partition)
    +        buckets(partition).trimStart(n)
    +    }
    +    startOffset = endOffset
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  /** Stop this source. */
    +  override def stop(): Unit = synchronized {
    +    if (socket != null) {
    +      try {
    +        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    +        // stop the readThread is to close the socket.
    +        socket.close()
    +      } catch {
    +        case e: IOException =>
    +      }
    +      socket = null
    +    }
    +    if (endpointRef != null) recordEndpoint.rpcEnv.stop(endpointRef)
    +  }
    +
    +  private def initialize(): Unit = synchronized {
    +    socket = new Socket(host, port)
    --- End diff --
    
    filed: https://issues.apache.org/jira/browse/SPARK-25000


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r206384495
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketOffset(Serialization.read[List[Int]](json))
    +  }
    +
    +  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
    +    this.startOffset = offset
    +      .orElse(TextSocketOffset(List.fill(numPartitions)(0)))
    +      .asInstanceOf[TextSocketOffset]
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  override def getStartOffset: Offset = startOffset
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      TextSocketContinuousReader.SCHEMA_TIMESTAMP
    +    } else {
    +      TextSocketContinuousReader.SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def planRowInputPartitions(): JList[InputPartition[Row]] = {
    +
    +    val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
    +    endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
    +
    +    val offsets = startOffset match {
    +      case off: TextSocketOffset => off.offsets
    +      case off =>
    +        throw new IllegalArgumentException(
    +          s"invalid offset type ${off.getClass} for TextSocketContinuousReader")
    +    }
    +
    +    if (offsets.size != numPartitions) {
    +      throw new IllegalArgumentException(
    +        s"The previous run contained ${offsets.size} partitions, but" +
    +          s" $numPartitions partitions are currently configured. The numPartitions option" +
    +          " cannot be changed.")
    +    }
    +
    +    startOffset.offsets.zipWithIndex.map {
    +      case (offset, i) =>
    +        TextSocketContinuousInputPartition(
    +          endpointName, i, offset, includeTimestamp): InputPartition[Row]
    +    }.asJava
    +
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val endOffset = end.asInstanceOf[TextSocketOffset]
    --- End diff --
    
    Micro-batch Socket Reader validates the type of `end` and calls `sys.error` with some informational message: we may be better to give meaningful message like this.
    
    Btw, my 2 cents, more specific exception is always better, so I'm +1 to throw IllegalStateException rather than calling `sys.error` which throws RuntimeException, like below lines.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21199


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93801/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    
    @HeartSaVioR , rebased with master. 
    
    ping @jose-torres @tdas @zsxwing for review.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90755 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90755/testReport)** for PR 21199 at commit [`68c5eed`](https://github.com/apache/spark/commit/68c5eed7c6d32db8a7a469a1970e6fa731a91c70).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #91615 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91615/testReport)** for PR 21199 at commit [`68c5eed`](https://github.com/apache/spark/commit/68c5eed7c6d32db8a7a469a1970e6fa731a91c70).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r189063593
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -256,7 +258,101 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
         }
       }
     
    -  private class ServerThread extends Thread with Logging {
    +  test("continuous data") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val reader = new TextSocketContinuousReader(
    +      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
    +        "port" -> serverThread.port.toString).asJava))
    +    reader.setStartOffset(Optional.empty())
    +    val tasks = reader.planInputPartitions()
    +    assert(tasks.size == 2)
    +
    +    val numRecords = 10
    +    val data = scala.collection.mutable.ListBuffer[Int]()
    +    val offsets = scala.collection.mutable.ListBuffer[Int]()
    +    import org.scalatest.time.SpanSugar._
    +    failAfter(5 seconds) {
    --- End diff --
    
    Can't we use testStream for these tests?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    I won't be able to look at this in detail until next week.
    
    In general, I think this is a great source to have available. I wonder if it'd be worthwhile to try and abstract the record forwarding RPCs from here and ContinuousMemoryStream together.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90349 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90349/testReport)** for PR 21199 at commit [`f010943`](https://github.com/apache/spark/commit/f010943699b184cc9572bda8651cb40d6231bfa3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r186765402
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReader.GetRecord
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +
    +  case class GetRecord(offset: TextSocketPartitionOffset)
    +
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new RecordEndpoint()
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[TextSocketPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketOffset(Serialization.read[List[Int]](json))
    +  }
    +
    +  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
    +    this.startOffset = offset
    +      .orElse(TextSocketOffset(List.fill(numPartitions)(0)))
    +      .asInstanceOf[TextSocketOffset]
    +  }
    +
    +  override def getStartOffset: Offset = startOffset
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      TextSocketContinuousReader.SCHEMA_TIMESTAMP
    +    } else {
    +      TextSocketContinuousReader.SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
    +
    +    val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
    +    endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
    +
    +    val offsets = startOffset match {
    +      case off: TextSocketOffset => off.offsets
    +      case off =>
    +        throw new IllegalArgumentException(
    +          s"invalid offset type ${off.getClass} for TextSocketContinuousReader")
    +    }
    +
    +    if (offsets.size != numPartitions) {
    +      throw new IllegalArgumentException(
    +        s"The previous run contained ${offsets.size} partitions, but" +
    +          s" $numPartitions partitions are currently configured. The numPartitions option" +
    +          " cannot be changed.")
    +    }
    +
    +    startOffset.offsets.zipWithIndex.map {
    +      case (offset, i) =>
    +        TextSocketContinuousDataReaderFactory(
    +          endpointName, i, offset): DataReaderFactory[Row]
    +    }.asJava
    +
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val endOffset = end.asInstanceOf[TextSocketOffset]
    +    endOffset.offsets.zipWithIndex.foreach {
    +      case (offset, partition) =>
    +        buckets(partition).trimStart(offset - startOffset.offsets(partition))
    +    }
    +    startOffset = endOffset
    --- End diff --
    
    If I understand right, this commit will never enter in your added test case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r189111878
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -256,7 +258,101 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
         }
       }
     
    -  private class ServerThread extends Thread with Logging {
    +  test("continuous data") {
    +    serverThread = new ServerThread()
    +    serverThread.start()
    +
    +    val reader = new TextSocketContinuousReader(
    +      new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
    +        "port" -> serverThread.port.toString).asJava))
    +    reader.setStartOffset(Optional.empty())
    +    val tasks = reader.planInputPartitions()
    +    assert(tasks.size == 2)
    +
    +    val numRecords = 10
    +    val data = scala.collection.mutable.ListBuffer[Int]()
    +    val offsets = scala.collection.mutable.ListBuffer[Int]()
    +    import org.scalatest.time.SpanSugar._
    +    failAfter(5 seconds) {
    --- End diff --
    
    Probably we could use, but the `addSocketData` does not work for continuous source and thought the reader offsets could be validated better this way. (followed the approach in RateStreamProviderSuite)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90606 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90606/testReport)** for PR 21199 at commit [`b962c3d`](https://github.com/apache/spark/commit/b962c3dbd1715b2d4fa03e65731e36697cf37ff1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r207096556
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketOffset(Serialization.read[List[Int]](json))
    +  }
    +
    +  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
    +    this.startOffset = offset
    +      .orElse(TextSocketOffset(List.fill(numPartitions)(0)))
    +      .asInstanceOf[TextSocketOffset]
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  override def getStartOffset: Offset = startOffset
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      TextSocketContinuousReader.SCHEMA_TIMESTAMP
    +    } else {
    +      TextSocketContinuousReader.SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def planRowInputPartitions(): JList[InputPartition[Row]] = {
    +
    +    val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
    +    endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
    +
    +    val offsets = startOffset match {
    +      case off: TextSocketOffset => off.offsets
    +      case off =>
    +        throw new IllegalArgumentException(
    +          s"invalid offset type ${off.getClass} for TextSocketContinuousReader")
    +    }
    +
    +    if (offsets.size != numPartitions) {
    +      throw new IllegalArgumentException(
    +        s"The previous run contained ${offsets.size} partitions, but" +
    +          s" $numPartitions partitions are currently configured. The numPartitions option" +
    +          " cannot be changed.")
    +    }
    +
    +    startOffset.offsets.zipWithIndex.map {
    +      case (offset, i) =>
    +        TextSocketContinuousInputPartition(
    +          endpointName, i, offset, includeTimestamp): InputPartition[Row]
    +    }.asJava
    +
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val endOffset = end.asInstanceOf[TextSocketOffset]
    +    endOffset.offsets.zipWithIndex.foreach {
    +      case (offset, partition) =>
    +        val max = startOffset.offsets(partition) + buckets(partition).size
    +        if (offset > max) {
    +          throw new IllegalStateException("Invalid offset " + offset + " to commit" +
    +          " for partition " + partition + ". Max valid offset: " + max)
    +        }
    +        val n = offset - startOffset.offsets(partition)
    +        buckets(partition).trimStart(n)
    +    }
    +    startOffset = endOffset
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  /** Stop this source. */
    +  override def stop(): Unit = synchronized {
    +    if (socket != null) {
    +      try {
    +        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    +        // stop the readThread is to close the socket.
    +        socket.close()
    +      } catch {
    +        case e: IOException =>
    +      }
    +      socket = null
    +    }
    +    if (endpointRef != null) recordEndpoint.rpcEnv.stop(endpointRef)
    +  }
    +
    +  private def initialize(): Unit = synchronized {
    +    socket = new Socket(host, port)
    --- End diff --
    
    Yeah you're planning to investigate and touch APIs then it sounds really good. May worth to file a new issue?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #94011 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94011/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    @arunmahadevan Thanks for rebasing. I'll take a look.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90603 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90603/testReport)** for PR 21199 at commit [`b3a42f0`](https://github.com/apache/spark/commit/b3a42f08cba85b9bec11aaa3f75de298aa869204).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94148/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #94148 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94148/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    I think that's unavoidable if we want to have a socket source. The microbatch socket source has the same thing going on. I'd expect most people looking into implementation details of data sources will understand that they ought to read from executors in general.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90349 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90349/testReport)** for PR 21199 at commit [`f010943`](https://github.com/apache/spark/commit/f010943699b184cc9572bda8651cb40d6231bfa3).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r189111977
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousRecordEndpoint.scala ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.Row
    +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
    +
    +case class ContinuousRecordPartitionOffset(partitionId: Int, offset: Int) extends PartitionOffset
    +case class GetRecord(offset: ContinuousRecordPartitionOffset)
    +
    +/**
    + * A RPC end point for continuous readers to poll for
    + * records from the driver.
    + *
    + * @param buckets the data buckets
    --- End diff --
    
    Added more comments to clarify.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #94321 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94321/testReport)** for PR 21199 at commit [`f4a39d9`](https://github.com/apache/spark/commit/f4a39d9ebae2d6f6ae59caf3140310b17e75b602).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94011/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21199#discussion_r207078263
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala ---
    @@ -0,0 +1,295 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.{BufferedReader, InputStreamReader, IOException}
    +import java.net.Socket
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.{Calendar, List => JList, Locale}
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +import org.json4s.{DefaultFormats, NoTypeHints}
    +import org.json4s.jackson.Serialization
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    +import org.apache.spark.util.RpcUtils
    +
    +
    +object TextSocketContinuousReader {
    +  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
    +  val SCHEMA_TIMESTAMP = StructType(
    +    StructField("value", StringType)
    +      :: StructField("timestamp", TimestampType) :: Nil)
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
    +}
    +
    +/**
    + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and
    + * debugging. This ContinuousReader will *not* work in production applications due to multiple
    + * reasons, including no support for fault recovery.
    + *
    + * The driver maintains a socket connection to the host-port, keeps the received messages in
    + * buckets and serves the messages to the executors via a RPC endpoint.
    + */
    +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader
    +  with SupportsDeprecatedScanRow with Logging {
    +  implicit val defaultFormats: DefaultFormats = DefaultFormats
    +
    +  private val host: String = options.get("host").get()
    +  private val port: Int = options.get("port").get().toInt
    +
    +  assert(SparkSession.getActiveSession.isDefined)
    +  private val spark = SparkSession.getActiveSession.get
    +  private val numPartitions = spark.sparkContext.defaultParallelism
    +
    +  @GuardedBy("this")
    +  private var socket: Socket = _
    +
    +  @GuardedBy("this")
    +  private var readThread: Thread = _
    +
    +  @GuardedBy("this")
    +  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)])
    +
    +  @GuardedBy("this")
    +  private var currentOffset: Int = -1
    +
    +  private var startOffset: TextSocketOffset = _
    +
    +  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
    +  @volatile private var endpointRef: RpcEndpointRef = _
    +
    +  initialize()
    +
    +  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
    +    assert(offsets.length == numPartitions)
    +    val offs = offsets
    +      .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
    +      .sortBy(_.partitionId)
    +      .map(_.offset)
    +      .toList
    +    TextSocketOffset(offs)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    TextSocketOffset(Serialization.read[List[Int]](json))
    +  }
    +
    +  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
    +    this.startOffset = offset
    +      .orElse(TextSocketOffset(List.fill(numPartitions)(0)))
    +      .asInstanceOf[TextSocketOffset]
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  override def getStartOffset: Offset = startOffset
    +
    +  override def readSchema(): StructType = {
    +    if (includeTimestamp) {
    +      TextSocketContinuousReader.SCHEMA_TIMESTAMP
    +    } else {
    +      TextSocketContinuousReader.SCHEMA_REGULAR
    +    }
    +  }
    +
    +  override def planRowInputPartitions(): JList[InputPartition[Row]] = {
    +
    +    val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
    +    endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
    +
    +    val offsets = startOffset match {
    +      case off: TextSocketOffset => off.offsets
    +      case off =>
    +        throw new IllegalArgumentException(
    +          s"invalid offset type ${off.getClass} for TextSocketContinuousReader")
    +    }
    +
    +    if (offsets.size != numPartitions) {
    +      throw new IllegalArgumentException(
    +        s"The previous run contained ${offsets.size} partitions, but" +
    +          s" $numPartitions partitions are currently configured. The numPartitions option" +
    +          " cannot be changed.")
    +    }
    +
    +    startOffset.offsets.zipWithIndex.map {
    +      case (offset, i) =>
    +        TextSocketContinuousInputPartition(
    +          endpointName, i, offset, includeTimestamp): InputPartition[Row]
    +    }.asJava
    +
    +  }
    +
    +  override def commit(end: Offset): Unit = synchronized {
    +    val endOffset = end.asInstanceOf[TextSocketOffset]
    +    endOffset.offsets.zipWithIndex.foreach {
    +      case (offset, partition) =>
    +        val max = startOffset.offsets(partition) + buckets(partition).size
    +        if (offset > max) {
    +          throw new IllegalStateException("Invalid offset " + offset + " to commit" +
    +          " for partition " + partition + ". Max valid offset: " + max)
    +        }
    +        val n = offset - startOffset.offsets(partition)
    +        buckets(partition).trimStart(n)
    +    }
    +    startOffset = endOffset
    +    recordEndpoint.setStartOffsets(startOffset.offsets)
    +  }
    +
    +  /** Stop this source. */
    +  override def stop(): Unit = synchronized {
    +    if (socket != null) {
    +      try {
    +        // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to
    +        // stop the readThread is to close the socket.
    +        socket.close()
    +      } catch {
    +        case e: IOException =>
    +      }
    +      socket = null
    +    }
    +    if (endpointRef != null) recordEndpoint.rpcEnv.stop(endpointRef)
    +  }
    +
    +  private def initialize(): Unit = synchronized {
    +    socket = new Socket(host, port)
    --- End diff --
    
    We could probably refactor and use common code but the usages are slightly different and I would like to do this outside the scope of this PR. I would like to identify and pull out some generic APIs that both micro-batch and continuous sources can implement so that such duplication can be avoided in general. With the current approach there are always two separate implementations for each type and and the chance of duplication is more.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90606/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90349/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93921/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21199
  
    **[Test build #90744 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90744/testReport)** for PR 21199 at commit [`242bcdb`](https://github.com/apache/spark/commit/242bcdbacad2b3133447ba7e7cef100567ed3fd0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class TextSocketContinuousInputPartition(`
      * `class TextSocketContinuousInputPartitionReader(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org