You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/10/22 09:27:08 UTC

[GitHub] [spark] brkyvz commented on a change in pull request #26018: [SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState

brkyvz commented on a change in pull request #26018: [SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState
URL: https://github.com/apache/spark/pull/26018#discussion_r337412331
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 ##########
 @@ -242,6 +243,83 @@ class StreamingQueryManagerSuite extends StreamTest {
     }
   }
 
+  testQuietly("can't start a streaming query with the same name in the same session") {
+    val ds1 = makeDataset._2
+    val ds2 = makeDataset._2
+    val queryName = "abc"
+
+    val query1 = ds1.writeStream.format("noop").queryName(queryName).start()
+    try {
+      val e = intercept[IllegalArgumentException] {
+        ds2.writeStream.format("noop").queryName(queryName).start()
+      }
+      assert(e.getMessage.contains("query with that name is already active"))
+    } finally {
+      query1.stop()
+    }
+  }
+
+  testQuietly("can start a streaming query with the same name in a different session") {
+    val session2 = spark.cloneSession()
+
+    val ds1 = MemoryStream(Encoders.INT, spark.sqlContext).toDS()
+    val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
+    val queryName = "abc"
+
+    val query1 = ds1.writeStream.format("noop").queryName(queryName).start()
+    val query2 = ds2.writeStream.format("noop").queryName(queryName).start()
+
+    query1.stop()
+    query2.stop()
+  }
+
+  testQuietly("can't start multiple instances of the same streaming query in the same session") {
+    withTempDir { dir =>
+      val session2 = spark.cloneSession()
+
+      val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
+      val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
+      val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
+      val dataLocation = new File(dir, "data").getCanonicalPath
+
+      val query1 = ms1.toDS().writeStream.format("parquet")
+        .option("checkpointLocation", chkLocation).start(dataLocation)
+      ms1.addData(1, 2, 3)
+      try {
+        val e = intercept[IllegalStateException] {
+          ds2.writeStream.format("parquet")
+            .option("checkpointLocation", chkLocation).start(dataLocation)
+        }
+        assert(e.getMessage.contains("same id"))
+      } finally {
+        query1.stop()
+      }
+    }
+  }
+
+  testQuietly(
+    "can't start multiple instances of the same streaming query in the different sessions") {
 
 Review comment:
   good catch. I named them wrong :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org