You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/03/31 17:55:27 UTC
spark git commit: [SPARK-20165][SS] Resolve state encoder's
deserializer in driver in FlatMapGroupsWithStateExec
Repository: spark
Updated Branches:
refs/heads/master b2349e6a0 -> 567a50acf
[SPARK-20165][SS] Resolve state encoder's deserializer in driver in FlatMapGroupsWithStateExec
## What changes were proposed in this pull request?
- Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail.
- Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data.
- Added`testQuietly` on some tests that generate a lot of error logs.
## How was this patch tested?
Multiple runs on existing unit tests
Author: Tathagata Das <ta...@gmail.com>
Closes #17488 from tdas/SPARK-20165.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/567a50ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/567a50ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/567a50ac
Branch: refs/heads/master
Commit: 567a50acfb0ae26bd430c290348886d494963696
Parents: b2349e6
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Mar 31 10:58:43 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Mar 31 10:58:43 2017 -0700
----------------------------------------------------------------------
.../streaming/FlatMapGroupsWithStateExec.scala | 28 +++++++++++---------
.../sql/streaming/FileStreamSourceSuite.scala | 4 +--
.../streaming/FlatMapGroupsWithStateSuite.scala | 7 ++---
.../spark/sql/streaming/StreamSuite.scala | 2 +-
.../apache/spark/sql/streaming/StreamTest.scala | 23 ++++++++++++++--
.../sql/streaming/StreamingQuerySuite.scala | 2 +-
6 files changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index c7262ea..e42df5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec(
val encSchemaAttribs = stateEncoder.schema.toAttributes
if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
}
+ // Get the serializer for the state, taking into account whether we need to save timestamps
+ private val stateSerializer = {
+ val encoderSerializer = stateEncoder.namedExpressions
+ if (isTimeoutEnabled) {
+ encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
+ } else {
+ encoderSerializer
+ }
+ }
+ // Get the deserializer for the state. Note that this must be done in the driver, as
+ // resolving and binding of deserializer expressions to the encoded type can be safely done
+ // only in the driver.
+ private val stateDeserializer = stateEncoder.resolveAndBind().deserializer
+
/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
@@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec(
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)
- // Converter for translating state rows to Java objects
+ // Converters for translating state between rows and Java objects
private val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
- stateEncoder.resolveAndBind().deserializer, stateAttributes)
-
- // Converter for translating state Java objects to rows
- private val stateSerializer = {
- val encoderSerializer = stateEncoder.namedExpressions
- if (isTimeoutEnabled) {
- encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
- } else {
- encoderSerializer
- }
- }
+ stateDeserializer, stateAttributes)
private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)
// Index of the additional metadata fields in the state row
http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index f705da3..171877a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
- test("max files per trigger - incorrect values") {
+ testQuietly("max files per trigger - incorrect values") {
val testTable = "maxFilesPerTrigger_test"
withTable(testTable) {
withTempDir { case src =>
@@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
import testImplicits._
- test("file source stress test") {
+ testQuietly("file source stress test") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index a00a1a5..c8e31e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -21,6 +21,8 @@ import java.sql.Date
import java.util.concurrent.ConcurrentHashMap
import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.apache.spark.SparkException
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
@@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
assertNumStateRows(total = 1, updated = 2),
StopStream,
- StartStream(ProcessingTime("1 second"), triggerClock = clock),
- AdvanceManualClock(10 * 1000),
+ StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, "c"),
- AdvanceManualClock(1 * 1000),
+ AdvanceManualClock(11 * 1000),
CheckLastBatch(("b", "-1"), ("c", "1")),
assertNumStateRows(total = 1, updated = 2),
http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 32920f6..388f154 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -426,7 +426,7 @@ class StreamSuite extends StreamTest {
CheckAnswer((1, 2), (2, 2), (3, 2)))
}
- test("recover from a Spark v2.1 checkpoint") {
+ testQuietly("recover from a Spark v2.1 checkpoint") {
var inputData: MemoryStream[Int] = null
var query: DataStreamWriter[Row] = null
http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 8cf1791..951ff2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -488,8 +488,27 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case a: AddData =>
try {
- // Add data and get the source where it was added, and the expected offset of the
- // added data.
+
+ // If the query is running with manual clock, then wait for the stream execution
+ // thread to start waiting for the clock to increment. This is needed so that we
+ // are adding data when there is no trigger that is active. This would ensure that
+ // the data gets deterministically added to the next batch triggered after the manual
+ // clock is incremented in following AdvanceManualClock. This avoid race conditions
+ // between the test thread and the stream execution thread in tests using manual
+ // clock.
+ if (currentStream != null &&
+ currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
+ val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
+ eventually("Error while synchronizing with manual clock before adding data") {
+ if (currentStream.isActive) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (!currentStream.isActive) {
+ failTest("Query terminated while synchronizing with manual clock")
+ }
+ }
+ // Add data
val queryToUse = Option(currentStream).orElse(Option(lastStream))
val (source, offset) = a.addData(queryToUse)
http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 3f41ecd..1172531 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}
- test("StreamingQuery should be Serializable but cannot be used in executors") {
+ testQuietly("StreamingQuery should be Serializable but cannot be used in executors") {
def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = {
ds.writeStream
.queryName(queryName)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org