You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/22 05:14:08 UTC
spark git commit: [SPARK-18425][STRUCTURED STREAMING][TESTS] Test
`CompactibleFileStreamLog` directly
Repository: spark
Updated Branches:
refs/heads/master 97a8239a6 -> ebeb0830a
[SPARK-18425][STRUCTURED STREAMING][TESTS] Test `CompactibleFileStreamLog` directly
## What changes were proposed in this pull request?
Right now we are testing the most of `CompactibleFileStreamLog` in `FileStreamSinkLogSuite` (because `FileStreamSinkLog` once was the only subclass of `CompactibleFileStreamLog`, but now it's not the case any more).
Let's refactor the tests so that `CompactibleFileStreamLog` is directly tested, making future changes (like https://github.com/apache/spark/pull/15828, https://github.com/apache/spark/pull/15827) to `CompactibleFileStreamLog` much easier to test and much easier to review.
## How was this patch tested?
the PR itself is about tests
Author: Liwei Lin <lw...@gmail.com>
Closes #15870 from lw-lin/test-compact-1113.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebeb0830
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebeb0830
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebeb0830
Branch: refs/heads/master
Commit: ebeb0830a3a4837c7354a0eee667b9f5fad389c5
Parents: 97a8239
Author: Liwei Lin <lw...@gmail.com>
Authored: Mon Nov 21 21:14:13 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Nov 21 21:14:13 2016 -0800
----------------------------------------------------------------------
.../CompactibleFileStreamLogSuite.scala | 216 ++++++++++++++++++-
.../streaming/FileStreamSinkLogSuite.scala | 68 ------
2 files changed, 214 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ebeb0830/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index 2cd2157..e511fda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -17,12 +17,79 @@
package org.apache.spark.sql.execution.streaming
-import org.apache.spark.SparkFunSuite
+import java.io._
+import java.nio.charset.StandardCharsets._
-class CompactibleFileStreamLogSuite extends SparkFunSuite {
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.execution.streaming.FakeFileSystem._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.test.SharedSQLContext
+
+class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext {
+
+ /** To avoid caching of FS objects */
+ override protected val sparkConf =
+ new SparkConf().set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
import CompactibleFileStreamLog._
+ /** -- testing of `object CompactibleFileStreamLog` begins -- */
+
+ test("getBatchIdFromFileName") {
+ assert(1234L === getBatchIdFromFileName("1234"))
+ assert(1234L === getBatchIdFromFileName("1234.compact"))
+ intercept[NumberFormatException] {
+ getBatchIdFromFileName("1234a")
+ }
+ }
+
+ test("isCompactionBatch") {
+ assert(false === isCompactionBatch(0, compactInterval = 3))
+ assert(false === isCompactionBatch(1, compactInterval = 3))
+ assert(true === isCompactionBatch(2, compactInterval = 3))
+ assert(false === isCompactionBatch(3, compactInterval = 3))
+ assert(false === isCompactionBatch(4, compactInterval = 3))
+ assert(true === isCompactionBatch(5, compactInterval = 3))
+ }
+
+ test("nextCompactionBatchId") {
+ assert(2 === nextCompactionBatchId(0, compactInterval = 3))
+ assert(2 === nextCompactionBatchId(1, compactInterval = 3))
+ assert(5 === nextCompactionBatchId(2, compactInterval = 3))
+ assert(5 === nextCompactionBatchId(3, compactInterval = 3))
+ assert(5 === nextCompactionBatchId(4, compactInterval = 3))
+ assert(8 === nextCompactionBatchId(5, compactInterval = 3))
+ }
+
+ test("getValidBatchesBeforeCompactionBatch") {
+ intercept[AssertionError] {
+ getValidBatchesBeforeCompactionBatch(0, compactInterval = 3)
+ }
+ intercept[AssertionError] {
+ getValidBatchesBeforeCompactionBatch(1, compactInterval = 3)
+ }
+ assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3))
+ intercept[AssertionError] {
+ getValidBatchesBeforeCompactionBatch(3, compactInterval = 3)
+ }
+ intercept[AssertionError] {
+ getValidBatchesBeforeCompactionBatch(4, compactInterval = 3)
+ }
+ assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3))
+ }
+
+ test("getAllValidBatches") {
+ assert(Seq(0) === getAllValidBatches(0, compactInterval = 3))
+ assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3))
+ assert(Seq(2) === getAllValidBatches(2, compactInterval = 3))
+ assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3))
+ assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3))
+ assert(Seq(5) === getAllValidBatches(5, compactInterval = 3))
+ assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3))
+ assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3))
+ assert(Seq(8) === getAllValidBatches(8, compactInterval = 3))
+ }
+
test("deriveCompactInterval") {
// latestCompactBatchId(4) + 1 <= default(5)
// then use latestestCompactBatchId + 1 === 5
@@ -30,4 +97,149 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite {
// First divisor of 10 greater than 4 === 5
assert(5 === deriveCompactInterval(4, 9))
}
+
+ /** -- testing of `object CompactibleFileStreamLog` ends -- */
+
+ test("batchIdToPath") {
+ withFakeCompactibleFileStreamLog(
+ fileCleanupDelayMs = Long.MaxValue,
+ defaultCompactInterval = 3,
+ compactibleLog => {
+ assert("0" === compactibleLog.batchIdToPath(0).getName)
+ assert("1" === compactibleLog.batchIdToPath(1).getName)
+ assert("2.compact" === compactibleLog.batchIdToPath(2).getName)
+ assert("3" === compactibleLog.batchIdToPath(3).getName)
+ assert("4" === compactibleLog.batchIdToPath(4).getName)
+ assert("5.compact" === compactibleLog.batchIdToPath(5).getName)
+ })
+ }
+
+ test("serialize") {
+ withFakeCompactibleFileStreamLog(
+ fileCleanupDelayMs = Long.MaxValue,
+ defaultCompactInterval = 3,
+ compactibleLog => {
+ val logs = Array("entry_1", "entry_2", "entry_3")
+ val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
+ |"entry_1"
+ |"entry_2"
+ |"entry_3"""".stripMargin
+ val baos = new ByteArrayOutputStream()
+ compactibleLog.serialize(logs, baos)
+ assert(expected === baos.toString(UTF_8.name()))
+
+ baos.reset()
+ compactibleLog.serialize(Array(), baos)
+ assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name()))
+ })
+ }
+
+ test("deserialize") {
+ withFakeCompactibleFileStreamLog(
+ fileCleanupDelayMs = Long.MaxValue,
+ defaultCompactInterval = 3,
+ compactibleLog => {
+ val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
+ |"entry_1"
+ |"entry_2"
+ |"entry_3"""".stripMargin
+ val expected = Array("entry_1", "entry_2", "entry_3")
+ assert(expected ===
+ compactibleLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))
+
+ assert(Nil ===
+ compactibleLog.deserialize(
+ new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8))))
+ })
+ }
+
+ testWithUninterruptibleThread("compact") {
+ withFakeCompactibleFileStreamLog(
+ fileCleanupDelayMs = Long.MaxValue,
+ defaultCompactInterval = 3,
+ compactibleLog => {
+ for (batchId <- 0 to 10) {
+ compactibleLog.add(batchId, Array("some_path_" + batchId))
+ val expectedFiles = (0 to batchId).map { id => "some_path_" + id }
+ assert(compactibleLog.allFiles() === expectedFiles)
+ if (isCompactionBatch(batchId, 3)) {
+ // Since batchId is a compaction batch, the batch log file should contain all logs
+ assert(compactibleLog.get(batchId).getOrElse(Nil) === expectedFiles)
+ }
+ }
+ })
+ }
+
+ testWithUninterruptibleThread("delete expired file") {
+ // Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically
+ withFakeCompactibleFileStreamLog(
+ fileCleanupDelayMs = 0,
+ defaultCompactInterval = 3,
+ compactibleLog => {
+ val fs = compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+ def listBatchFiles(): Set[String] = {
+ fs.listStatus(compactibleLog.metadataPath).map(_.getPath.getName).filter { fileName =>
+ try {
+ getBatchIdFromFileName(fileName)
+ true
+ } catch {
+ case _: NumberFormatException => false
+ }
+ }.toSet
+ }
+
+ compactibleLog.add(0, Array("some_path_0"))
+ assert(Set("0") === listBatchFiles())
+ compactibleLog.add(1, Array("some_path_1"))
+ assert(Set("0", "1") === listBatchFiles())
+ compactibleLog.add(2, Array("some_path_2"))
+ assert(Set("2.compact") === listBatchFiles())
+ compactibleLog.add(3, Array("some_path_3"))
+ assert(Set("2.compact", "3") === listBatchFiles())
+ compactibleLog.add(4, Array("some_path_4"))
+ assert(Set("2.compact", "3", "4") === listBatchFiles())
+ compactibleLog.add(5, Array("some_path_5"))
+ assert(Set("5.compact") === listBatchFiles())
+ })
+ }
+
+ private def withFakeCompactibleFileStreamLog(
+ fileCleanupDelayMs: Long,
+ defaultCompactInterval: Int,
+ f: FakeCompactibleFileStreamLog => Unit
+ ): Unit = {
+ withTempDir { file =>
+ val compactibleLog = new FakeCompactibleFileStreamLog(
+ fileCleanupDelayMs,
+ defaultCompactInterval,
+ spark,
+ file.getCanonicalPath)
+ f(compactibleLog)
+ }
+ }
+}
+
+object FakeCompactibleFileStreamLog {
+ val VERSION = "test_version"
+}
+
+class FakeCompactibleFileStreamLog(
+ _fileCleanupDelayMs: Long,
+ _defaultCompactInterval: Int,
+ sparkSession: SparkSession,
+ path: String)
+ extends CompactibleFileStreamLog[String](
+ FakeCompactibleFileStreamLog.VERSION,
+ sparkSession,
+ path
+ ) {
+
+ override protected def fileCleanupDelayMs: Long = _fileCleanupDelayMs
+
+ override protected def isDeletingExpiredLog: Boolean = true
+
+ override protected def defaultCompactInterval: Int = _defaultCompactInterval
+
+ override def compactLogs(logs: Seq[String]): Seq[String] = logs
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ebeb0830/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index e1bc674..e046fee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -29,61 +29,6 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
import CompactibleFileStreamLog._
import FileStreamSinkLog._
- test("getBatchIdFromFileName") {
- assert(1234L === getBatchIdFromFileName("1234"))
- assert(1234L === getBatchIdFromFileName("1234.compact"))
- intercept[NumberFormatException] {
- getBatchIdFromFileName("1234a")
- }
- }
-
- test("isCompactionBatch") {
- assert(false === isCompactionBatch(0, compactInterval = 3))
- assert(false === isCompactionBatch(1, compactInterval = 3))
- assert(true === isCompactionBatch(2, compactInterval = 3))
- assert(false === isCompactionBatch(3, compactInterval = 3))
- assert(false === isCompactionBatch(4, compactInterval = 3))
- assert(true === isCompactionBatch(5, compactInterval = 3))
- }
-
- test("nextCompactionBatchId") {
- assert(2 === nextCompactionBatchId(0, compactInterval = 3))
- assert(2 === nextCompactionBatchId(1, compactInterval = 3))
- assert(5 === nextCompactionBatchId(2, compactInterval = 3))
- assert(5 === nextCompactionBatchId(3, compactInterval = 3))
- assert(5 === nextCompactionBatchId(4, compactInterval = 3))
- assert(8 === nextCompactionBatchId(5, compactInterval = 3))
- }
-
- test("getValidBatchesBeforeCompactionBatch") {
- intercept[AssertionError] {
- getValidBatchesBeforeCompactionBatch(0, compactInterval = 3)
- }
- intercept[AssertionError] {
- getValidBatchesBeforeCompactionBatch(1, compactInterval = 3)
- }
- assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3))
- intercept[AssertionError] {
- getValidBatchesBeforeCompactionBatch(3, compactInterval = 3)
- }
- intercept[AssertionError] {
- getValidBatchesBeforeCompactionBatch(4, compactInterval = 3)
- }
- assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3))
- }
-
- test("getAllValidBatches") {
- assert(Seq(0) === getAllValidBatches(0, compactInterval = 3))
- assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3))
- assert(Seq(2) === getAllValidBatches(2, compactInterval = 3))
- assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3))
- assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3))
- assert(Seq(5) === getAllValidBatches(5, compactInterval = 3))
- assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3))
- assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3))
- assert(Seq(8) === getAllValidBatches(8, compactInterval = 3))
- }
-
test("compactLogs") {
withFileStreamSinkLog { sinkLog =>
val logs = Seq(
@@ -184,19 +129,6 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("batchIdToPath") {
- withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
- withFileStreamSinkLog { sinkLog =>
- assert("0" === sinkLog.batchIdToPath(0).getName)
- assert("1" === sinkLog.batchIdToPath(1).getName)
- assert("2.compact" === sinkLog.batchIdToPath(2).getName)
- assert("3" === sinkLog.batchIdToPath(3).getName)
- assert("4" === sinkLog.batchIdToPath(4).getName)
- assert("5.compact" === sinkLog.batchIdToPath(5).getName)
- }
- }
- }
-
testWithUninterruptibleThread("compact") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withFileStreamSinkLog { sinkLog =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org