You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/22 05:14:08 UTC

spark git commit: [SPARK-18425][STRUCTURED STREAMING][TESTS] Test `CompactibleFileStreamLog` directly

Repository: spark
Updated Branches:
  refs/heads/master 97a8239a6 -> ebeb0830a


[SPARK-18425][STRUCTURED STREAMING][TESTS] Test `CompactibleFileStreamLog` directly

## What changes were proposed in this pull request?

Right now we are testing the most of `CompactibleFileStreamLog` in `FileStreamSinkLogSuite` (because `FileStreamSinkLog` once was the only subclass of `CompactibleFileStreamLog`, but now it's not the case any more).

Let's refactor the tests so that `CompactibleFileStreamLog` is directly tested, making future changes (like https://github.com/apache/spark/pull/15828, https://github.com/apache/spark/pull/15827) to `CompactibleFileStreamLog` much easier to test and much easier to review.

## How was this patch tested?

the PR itself is about tests

Author: Liwei Lin <lw...@gmail.com>

Closes #15870 from lw-lin/test-compact-1113.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebeb0830
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebeb0830
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebeb0830

Branch: refs/heads/master
Commit: ebeb0830a3a4837c7354a0eee667b9f5fad389c5
Parents: 97a8239
Author: Liwei Lin <lw...@gmail.com>
Authored: Mon Nov 21 21:14:13 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Nov 21 21:14:13 2016 -0800

----------------------------------------------------------------------
 .../CompactibleFileStreamLogSuite.scala         | 216 ++++++++++++++++++-
 .../streaming/FileStreamSinkLogSuite.scala      |  68 ------
 2 files changed, 214 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ebeb0830/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index 2cd2157..e511fda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -17,12 +17,79 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.SparkFunSuite
+import java.io._
+import java.nio.charset.StandardCharsets._
 
-class CompactibleFileStreamLogSuite extends SparkFunSuite {
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.execution.streaming.FakeFileSystem._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.test.SharedSQLContext
+
+class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext {
+
+  /** To avoid caching of FS objects */
+  override protected val sparkConf =
+    new SparkConf().set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
 
   import CompactibleFileStreamLog._
 
+  /** -- testing of `object CompactibleFileStreamLog` begins -- */
+
+  test("getBatchIdFromFileName") {
+    assert(1234L === getBatchIdFromFileName("1234"))
+    assert(1234L === getBatchIdFromFileName("1234.compact"))
+    intercept[NumberFormatException] {
+      getBatchIdFromFileName("1234a")
+    }
+  }
+
+  test("isCompactionBatch") {
+    assert(false === isCompactionBatch(0, compactInterval = 3))
+    assert(false === isCompactionBatch(1, compactInterval = 3))
+    assert(true === isCompactionBatch(2, compactInterval = 3))
+    assert(false === isCompactionBatch(3, compactInterval = 3))
+    assert(false === isCompactionBatch(4, compactInterval = 3))
+    assert(true === isCompactionBatch(5, compactInterval = 3))
+  }
+
+  test("nextCompactionBatchId") {
+    assert(2 === nextCompactionBatchId(0, compactInterval = 3))
+    assert(2 === nextCompactionBatchId(1, compactInterval = 3))
+    assert(5 === nextCompactionBatchId(2, compactInterval = 3))
+    assert(5 === nextCompactionBatchId(3, compactInterval = 3))
+    assert(5 === nextCompactionBatchId(4, compactInterval = 3))
+    assert(8 === nextCompactionBatchId(5, compactInterval = 3))
+  }
+
+  test("getValidBatchesBeforeCompactionBatch") {
+    intercept[AssertionError] {
+      getValidBatchesBeforeCompactionBatch(0, compactInterval = 3)
+    }
+    intercept[AssertionError] {
+      getValidBatchesBeforeCompactionBatch(1, compactInterval = 3)
+    }
+    assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3))
+    intercept[AssertionError] {
+      getValidBatchesBeforeCompactionBatch(3, compactInterval = 3)
+    }
+    intercept[AssertionError] {
+      getValidBatchesBeforeCompactionBatch(4, compactInterval = 3)
+    }
+    assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3))
+  }
+
+  test("getAllValidBatches") {
+    assert(Seq(0) === getAllValidBatches(0, compactInterval = 3))
+    assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3))
+    assert(Seq(2) === getAllValidBatches(2, compactInterval = 3))
+    assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3))
+    assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3))
+    assert(Seq(5) === getAllValidBatches(5, compactInterval = 3))
+    assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3))
+    assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3))
+    assert(Seq(8) === getAllValidBatches(8, compactInterval = 3))
+  }
+
   test("deriveCompactInterval") {
     // latestCompactBatchId(4) + 1 <= default(5)
     // then use latestestCompactBatchId + 1 === 5
@@ -30,4 +97,149 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite {
     // First divisor of 10 greater than 4 === 5
     assert(5 === deriveCompactInterval(4, 9))
   }
+
+  /** -- testing of `object CompactibleFileStreamLog` ends -- */
+
+  test("batchIdToPath") {
+    withFakeCompactibleFileStreamLog(
+      fileCleanupDelayMs = Long.MaxValue,
+      defaultCompactInterval = 3,
+      compactibleLog => {
+        assert("0" === compactibleLog.batchIdToPath(0).getName)
+        assert("1" === compactibleLog.batchIdToPath(1).getName)
+        assert("2.compact" === compactibleLog.batchIdToPath(2).getName)
+        assert("3" === compactibleLog.batchIdToPath(3).getName)
+        assert("4" === compactibleLog.batchIdToPath(4).getName)
+        assert("5.compact" === compactibleLog.batchIdToPath(5).getName)
+      })
+  }
+
+  test("serialize") {
+    withFakeCompactibleFileStreamLog(
+      fileCleanupDelayMs = Long.MaxValue,
+      defaultCompactInterval = 3,
+      compactibleLog => {
+        val logs = Array("entry_1", "entry_2", "entry_3")
+        val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
+            |"entry_1"
+            |"entry_2"
+            |"entry_3"""".stripMargin
+        val baos = new ByteArrayOutputStream()
+        compactibleLog.serialize(logs, baos)
+        assert(expected === baos.toString(UTF_8.name()))
+
+        baos.reset()
+        compactibleLog.serialize(Array(), baos)
+        assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name()))
+      })
+  }
+
+  test("deserialize") {
+    withFakeCompactibleFileStreamLog(
+      fileCleanupDelayMs = Long.MaxValue,
+      defaultCompactInterval = 3,
+      compactibleLog => {
+        val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
+            |"entry_1"
+            |"entry_2"
+            |"entry_3"""".stripMargin
+        val expected = Array("entry_1", "entry_2", "entry_3")
+        assert(expected ===
+          compactibleLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))
+
+        assert(Nil ===
+          compactibleLog.deserialize(
+            new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8))))
+      })
+  }
+
+  testWithUninterruptibleThread("compact") {
+    withFakeCompactibleFileStreamLog(
+      fileCleanupDelayMs = Long.MaxValue,
+      defaultCompactInterval = 3,
+      compactibleLog => {
+        for (batchId <- 0 to 10) {
+          compactibleLog.add(batchId, Array("some_path_" + batchId))
+          val expectedFiles = (0 to batchId).map { id => "some_path_" + id }
+          assert(compactibleLog.allFiles() === expectedFiles)
+          if (isCompactionBatch(batchId, 3)) {
+            // Since batchId is a compaction batch, the batch log file should contain all logs
+            assert(compactibleLog.get(batchId).getOrElse(Nil) === expectedFiles)
+          }
+        }
+      })
+  }
+
+  testWithUninterruptibleThread("delete expired file") {
+    // Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically
+    withFakeCompactibleFileStreamLog(
+      fileCleanupDelayMs = 0,
+      defaultCompactInterval = 3,
+      compactibleLog => {
+        val fs = compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+        def listBatchFiles(): Set[String] = {
+          fs.listStatus(compactibleLog.metadataPath).map(_.getPath.getName).filter { fileName =>
+            try {
+              getBatchIdFromFileName(fileName)
+              true
+            } catch {
+              case _: NumberFormatException => false
+            }
+          }.toSet
+        }
+
+        compactibleLog.add(0, Array("some_path_0"))
+        assert(Set("0") === listBatchFiles())
+        compactibleLog.add(1, Array("some_path_1"))
+        assert(Set("0", "1") === listBatchFiles())
+        compactibleLog.add(2, Array("some_path_2"))
+        assert(Set("2.compact") === listBatchFiles())
+        compactibleLog.add(3, Array("some_path_3"))
+        assert(Set("2.compact", "3") === listBatchFiles())
+        compactibleLog.add(4, Array("some_path_4"))
+        assert(Set("2.compact", "3", "4") === listBatchFiles())
+        compactibleLog.add(5, Array("some_path_5"))
+        assert(Set("5.compact") === listBatchFiles())
+      })
+  }
+
+  private def withFakeCompactibleFileStreamLog(
+    fileCleanupDelayMs: Long,
+    defaultCompactInterval: Int,
+    f: FakeCompactibleFileStreamLog => Unit
+  ): Unit = {
+    withTempDir { file =>
+      val compactibleLog = new FakeCompactibleFileStreamLog(
+        fileCleanupDelayMs,
+        defaultCompactInterval,
+        spark,
+        file.getCanonicalPath)
+      f(compactibleLog)
+    }
+  }
+}
+
+object FakeCompactibleFileStreamLog {
+  val VERSION = "test_version"
+}
+
+class FakeCompactibleFileStreamLog(
+    _fileCleanupDelayMs: Long,
+    _defaultCompactInterval: Int,
+    sparkSession: SparkSession,
+    path: String)
+  extends CompactibleFileStreamLog[String](
+    FakeCompactibleFileStreamLog.VERSION,
+    sparkSession,
+    path
+  ) {
+
+  override protected def fileCleanupDelayMs: Long = _fileCleanupDelayMs
+
+  override protected def isDeletingExpiredLog: Boolean = true
+
+  override protected def defaultCompactInterval: Int = _defaultCompactInterval
+
+  override def compactLogs(logs: Seq[String]): Seq[String] = logs
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ebeb0830/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index e1bc674..e046fee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -29,61 +29,6 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
   import CompactibleFileStreamLog._
   import FileStreamSinkLog._
 
-  test("getBatchIdFromFileName") {
-    assert(1234L === getBatchIdFromFileName("1234"))
-    assert(1234L === getBatchIdFromFileName("1234.compact"))
-    intercept[NumberFormatException] {
-      getBatchIdFromFileName("1234a")
-    }
-  }
-
-  test("isCompactionBatch") {
-    assert(false === isCompactionBatch(0, compactInterval = 3))
-    assert(false === isCompactionBatch(1, compactInterval = 3))
-    assert(true === isCompactionBatch(2, compactInterval = 3))
-    assert(false === isCompactionBatch(3, compactInterval = 3))
-    assert(false === isCompactionBatch(4, compactInterval = 3))
-    assert(true === isCompactionBatch(5, compactInterval = 3))
-  }
-
-  test("nextCompactionBatchId") {
-    assert(2 === nextCompactionBatchId(0, compactInterval = 3))
-    assert(2 === nextCompactionBatchId(1, compactInterval = 3))
-    assert(5 === nextCompactionBatchId(2, compactInterval = 3))
-    assert(5 === nextCompactionBatchId(3, compactInterval = 3))
-    assert(5 === nextCompactionBatchId(4, compactInterval = 3))
-    assert(8 === nextCompactionBatchId(5, compactInterval = 3))
-  }
-
-  test("getValidBatchesBeforeCompactionBatch") {
-    intercept[AssertionError] {
-      getValidBatchesBeforeCompactionBatch(0, compactInterval = 3)
-    }
-    intercept[AssertionError] {
-      getValidBatchesBeforeCompactionBatch(1, compactInterval = 3)
-    }
-    assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3))
-    intercept[AssertionError] {
-      getValidBatchesBeforeCompactionBatch(3, compactInterval = 3)
-    }
-    intercept[AssertionError] {
-      getValidBatchesBeforeCompactionBatch(4, compactInterval = 3)
-    }
-    assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3))
-  }
-
-  test("getAllValidBatches") {
-    assert(Seq(0) === getAllValidBatches(0, compactInterval = 3))
-    assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3))
-    assert(Seq(2) === getAllValidBatches(2, compactInterval = 3))
-    assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3))
-    assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3))
-    assert(Seq(5) === getAllValidBatches(5, compactInterval = 3))
-    assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3))
-    assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3))
-    assert(Seq(8) === getAllValidBatches(8, compactInterval = 3))
-  }
-
   test("compactLogs") {
     withFileStreamSinkLog { sinkLog =>
       val logs = Seq(
@@ -184,19 +129,6 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
-  test("batchIdToPath") {
-    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
-      withFileStreamSinkLog { sinkLog =>
-        assert("0" === sinkLog.batchIdToPath(0).getName)
-        assert("1" === sinkLog.batchIdToPath(1).getName)
-        assert("2.compact" === sinkLog.batchIdToPath(2).getName)
-        assert("3" === sinkLog.batchIdToPath(3).getName)
-        assert("4" === sinkLog.batchIdToPath(4).getName)
-        assert("5.compact" === sinkLog.batchIdToPath(5).getName)
-      }
-    }
-  }
-
   testWithUninterruptibleThread("compact") {
     withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
       withFileStreamSinkLog { sinkLog =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org