You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/03/31 17:55:27 UTC

spark git commit: [SPARK-20165][SS] Resolve state encoder's deserializer in driver in FlatMapGroupsWithStateExec

Repository: spark
Updated Branches:
  refs/heads/master b2349e6a0 -> 567a50acf


[SPARK-20165][SS] Resolve state encoder's deserializer in driver in FlatMapGroupsWithStateExec

## What changes were proposed in this pull request?

- Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail.

- Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data.

- Added`testQuietly` on some tests that generate a lot of error logs.

## How was this patch tested?
Multiple runs on existing unit tests

Author: Tathagata Das <ta...@gmail.com>

Closes #17488 from tdas/SPARK-20165.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/567a50ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/567a50ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/567a50ac

Branch: refs/heads/master
Commit: 567a50acfb0ae26bd430c290348886d494963696
Parents: b2349e6
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Mar 31 10:58:43 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Mar 31 10:58:43 2017 -0700

----------------------------------------------------------------------
 .../streaming/FlatMapGroupsWithStateExec.scala  | 28 +++++++++++---------
 .../sql/streaming/FileStreamSourceSuite.scala   |  4 +--
 .../streaming/FlatMapGroupsWithStateSuite.scala |  7 ++---
 .../spark/sql/streaming/StreamSuite.scala       |  2 +-
 .../apache/spark/sql/streaming/StreamTest.scala | 23 ++++++++++++++--
 .../sql/streaming/StreamingQuerySuite.scala     |  2 +-
 6 files changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index c7262ea..e42df5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec(
     val encSchemaAttribs = stateEncoder.schema.toAttributes
     if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
   }
+  // Get the serializer for the state, taking into account whether we need to save timestamps
+  private val stateSerializer = {
+    val encoderSerializer = stateEncoder.namedExpressions
+    if (isTimeoutEnabled) {
+      encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
+    } else {
+      encoderSerializer
+    }
+  }
+  // Get the deserializer for the state. Note that this must be done in the driver, as
+  // resolving and binding of deserializer expressions to the encoded type can be safely done
+  // only in the driver.
+  private val stateDeserializer = stateEncoder.resolveAndBind().deserializer
+
 
   /** Distribute by grouping attributes */
   override def requiredChildDistribution: Seq[Distribution] =
@@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec(
       ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
     private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)
 
-    // Converter for translating state rows to Java objects
+    // Converters for translating state between rows and Java objects
     private val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
-      stateEncoder.resolveAndBind().deserializer, stateAttributes)
-
-    // Converter for translating state Java objects to rows
-    private val stateSerializer = {
-      val encoderSerializer = stateEncoder.namedExpressions
-      if (isTimeoutEnabled) {
-        encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
-      } else {
-        encoderSerializer
-      }
-    }
+      stateDeserializer, stateAttributes)
     private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)
 
     // Index of the additional metadata fields in the state row

http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index f705da3..171877a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger - incorrect values") {
+  testQuietly("max files per trigger - incorrect values") {
     val testTable = "maxFilesPerTrigger_test"
     withTable(testTable) {
       withTempDir { case src =>
@@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
 
   import testImplicits._
 
-  test("file source stress test") {
+  testQuietly("file source stress test") {
     val src = Utils.createTempDir(namePrefix = "streaming.src")
     val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index a00a1a5..c8e31e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -21,6 +21,8 @@ import java.sql.Date
 import java.util.concurrent.ConcurrentHashMap
 
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
 
 import org.apache.spark.SparkException
 import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
@@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
       assertNumStateRows(total = 1, updated = 2),
 
       StopStream,
-      StartStream(ProcessingTime("1 second"), triggerClock = clock),
-      AdvanceManualClock(10 * 1000),
+      StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
 
       AddData(inputData, "c"),
-      AdvanceManualClock(1 * 1000),
+      AdvanceManualClock(11 * 1000),
       CheckLastBatch(("b", "-1"), ("c", "1")),
       assertNumStateRows(total = 1, updated = 2),
 

http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 32920f6..388f154 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -426,7 +426,7 @@ class StreamSuite extends StreamTest {
       CheckAnswer((1, 2), (2, 2), (3, 2)))
   }
 
-  test("recover from a Spark v2.1 checkpoint") {
+  testQuietly("recover from a Spark v2.1 checkpoint") {
     var inputData: MemoryStream[Int] = null
     var query: DataStreamWriter[Row] = null
 

http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 8cf1791..951ff2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -488,8 +488,27 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
 
           case a: AddData =>
             try {
-              // Add data and get the source where it was added, and the expected offset of the
-              // added data.
+
+              // If the query is running with manual clock, then wait for the stream execution
+              // thread to start waiting for the clock to increment. This is needed so that we
+              // are adding data when there is no trigger that is active. This would ensure that
+              // the data gets deterministically added to the next batch triggered after the manual
+              // clock is incremented in following AdvanceManualClock. This avoid race conditions
+              // between the test thread and the stream execution thread in tests using manual
+              // clock.
+              if (currentStream != null &&
+                  currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
+                val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
+                eventually("Error while synchronizing with manual clock before adding data") {
+                  if (currentStream.isActive) {
+                    assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+                  }
+                }
+                if (!currentStream.isActive) {
+                  failTest("Query terminated while synchronizing with manual clock")
+                }
+              }
+              // Add data
               val queryToUse = Option(currentStream).orElse(Option(lastStream))
               val (source, offset) = a.addData(queryToUse)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/567a50ac/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 3f41ecd..1172531 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     }
   }
 
-  test("StreamingQuery should be Serializable but cannot be used in executors") {
+  testQuietly("StreamingQuery should be Serializable but cannot be used in executors") {
     def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = {
       ds.writeStream
         .queryName(queryName)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org