You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/21 21:21:08 UTC

spark git commit: [SPARK-24325] Tests for Hadoop's LinesReader

Repository: spark
Updated Branches:
  refs/heads/master ffaefe755 -> b550b2a1a


[SPARK-24325] Tests for Hadoop's LinesReader

## What changes were proposed in this pull request?

The tests cover basic functionality of [Hadoop LinesReader](https://github.com/apache/spark/blob/8d79113b812a91073d2c24a3a9ad94cc3b90b24a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala#L42). In particular, the added tests check:

- A split slices a line or delimiter
- A split slices two consecutive lines and cover a delimiter between the lines
- Two splits slice a line and there are no duplicates
- Internal buffer size (`io.file.buffer.size`) is less than line length
- Constrain of maximum line length - `mapreduce.input.linerecordreader.line.maxlength`

Author: Maxim Gekk <ma...@databricks.com>

Closes #21377 from MaxGekk/line-reader-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b550b2a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b550b2a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b550b2a1

Branch: refs/heads/master
Commit: b550b2a1a159941c7327973182f16004a6bf179d
Parents: ffaefe7
Author: Maxim Gekk <ma...@databricks.com>
Authored: Mon May 21 14:21:05 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Mon May 21 14:21:05 2018 -0700

----------------------------------------------------------------------
 .../HadoopFileLinesReaderSuite.scala            | 137 +++++++++++++++++++
 1 file changed, 137 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b550b2a1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
new file mode 100644
index 0000000..a39a25b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.test.SharedSQLContext
+
+class HadoopFileLinesReaderSuite extends SharedSQLContext {
+  def getLines(
+      path: File,
+      text: String,
+      ranges: Seq[(Long, Long)],
+      delimiter: Option[String] = None,
+      conf: Option[Configuration] = None): Seq[String] = {
+    val delimOpt = delimiter.map(_.getBytes(StandardCharsets.UTF_8))
+    Files.write(path.toPath, text.getBytes(StandardCharsets.UTF_8))
+
+    val lines = ranges.map { case (start, length) =>
+      val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath, start, length)
+      val hadoopConf = conf.getOrElse(spark.sparkContext.hadoopConfiguration)
+      val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf)
+
+      reader.map(_.toString)
+    }.flatten
+
+    lines
+  }
+
+  test("A split ends at the delimiter") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "a\r\nb", ranges = Seq((0, 1), (1, 3)))
+      assert(lines == Seq("a", "b"))
+    }
+  }
+
+  test("A split cuts the delimiter") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "a\r\nb", ranges = Seq((0, 2), (2, 2)))
+      assert(lines == Seq("a", "b"))
+    }
+  }
+
+  test("A split ends at the end of the delimiter") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "a\r\nb", ranges = Seq((0, 3), (3, 1)))
+      assert(lines == Seq("a", "b"))
+    }
+  }
+
+  test("A split covers two lines") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "a\r\nb", ranges = Seq((0, 4), (4, 1)))
+      assert(lines == Seq("a", "b"))
+    }
+  }
+
+  test("A split ends at the custom delimiter") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "a^_^b", ranges = Seq((0, 1), (1, 4)), Some("^_^"))
+      assert(lines == Seq("a", "b"))
+    }
+  }
+
+  test("A split slices the custom delimiter") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "a^_^b", ranges = Seq((0, 2), (2, 3)), Some("^_^"))
+      assert(lines == Seq("a", "b"))
+    }
+  }
+
+  test("The first split covers the first line and the custom delimiter") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "a^_^b", ranges = Seq((0, 4), (4, 1)), Some("^_^"))
+      assert(lines == Seq("a", "b"))
+    }
+  }
+
+  test("A split cuts the first line") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "abc,def", ranges = Seq((0, 1)), Some(","))
+      assert(lines == Seq("abc"))
+    }
+  }
+
+  test("The split cuts both lines") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "abc,def", ranges = Seq((2, 2)), Some(","))
+      assert(lines == Seq("def"))
+    }
+  }
+
+  test("io.file.buffer.size is less than line length") {
+    val conf = spark.sparkContext.hadoopConfiguration
+    conf.set("io.file.buffer.size", "2")
+    withTempPath { path =>
+      val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5)))
+      assert(lines == Seq("123456"))
+    }
+  }
+
+  test("line cannot be longer than line.maxlength") {
+    val conf = spark.sparkContext.hadoopConfiguration
+    conf.set("mapreduce.input.linerecordreader.line.maxlength", "5")
+    withTempPath { path =>
+      val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15)))
+      assert(lines == Seq("1234"))
+    }
+  }
+
+  test("default delimiter is 0xd or 0xa or 0xd0xa") {
+    withTempPath { path =>
+      val lines = getLines(path, text = "1\r2\n3\r\n4", ranges = Seq((0, 3), (3, 5)))
+      assert(lines == Seq("1", "2", "3", "4"))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org