You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/07/31 05:26:51 UTC
[spark] branch master updated: [SPARK-25382][SQL][PYSPARK] Remove
ImageSchema.readImages in 3.0
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a745381 [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
a745381 is described below
commit a745381b9d3dd290057ef3089de7fdb9264f1f8b
Author: WeichenXu <we...@databricks.com>
AuthorDate: Wed Jul 31 14:26:18 2019 +0900
[SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
## What changes were proposed in this pull request?
I remove the deprecate `ImageSchema.readImages`.
Move some useful methods from class `ImageSchema` into class `ImageFileFormat`.
In pyspark, I rename `ImageSchema` class to be `ImageUtils`, and keep some useful python methods in it.
## How was this patch tested?
UT.
Please review https://spark.apache.org/contributing.html before opening a pull request.
Closes #25245 from WeichenXu123/remove_image_schema.
Authored-by: WeichenXu <we...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../org/apache/spark/ml/image/ImageSchema.scala | 72 ---------
.../apache/spark/ml/image/ImageSchemaSuite.scala | 171 ---------------------
.../ml/source/image/ImageFileFormatSuite.scala | 18 +++
project/MimaExcludes.scala | 6 +-
python/pyspark/ml/image.py | 38 -----
python/pyspark/ml/tests/test_image.py | 29 ++--
6 files changed, 42 insertions(+), 292 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala b/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala
index a7ddf2f..0313626 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala
@@ -191,76 +191,4 @@ object ImageSchema {
Some(Row(Row(origin, height, width, nChannels, mode, decoded)))
}
}
-
- /**
- * Read the directory of images from the local or remote source
- *
- * @note If multiple jobs are run in parallel with different sampleRatio or recursive flag,
- * there may be a race condition where one job overwrites the hadoop configs of another.
- * @note If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
- * potentially non-deterministic.
- *
- * @param path Path to the image directory
- * @return DataFrame with a single column "image" of images;
- * see ImageSchema for the details
- */
- @deprecated("use `spark.read.format(\"image\").load(path)` and this `readImages` will be " +
- "removed in 3.0.0.", "2.4.0")
- def readImages(path: String): DataFrame = readImages(path, null, false, -1, false, 1.0, 0)
-
- /**
- * Read the directory of images from the local or remote source
- *
- * @note If multiple jobs are run in parallel with different sampleRatio or recursive flag,
- * there may be a race condition where one job overwrites the hadoop configs of another.
- * @note If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
- * potentially non-deterministic.
- *
- * @param path Path to the image directory
- * @param sparkSession Spark Session, if omitted gets or creates the session
- * @param recursive Recursive path search flag
- * @param numPartitions Number of the DataFrame partitions,
- * if omitted uses defaultParallelism instead
- * @param dropImageFailures Drop the files that are not valid images from the result
- * @param sampleRatio Fraction of the files loaded
- * @return DataFrame with a single column "image" of images;
- * see ImageSchema for the details
- */
- @deprecated("use `spark.read.format(\"image\").load(path)` and this `readImages` will be " +
- "removed in 3.0.0.", "2.4.0")
- def readImages(
- path: String,
- sparkSession: SparkSession,
- recursive: Boolean,
- numPartitions: Int,
- dropImageFailures: Boolean,
- sampleRatio: Double,
- seed: Long): DataFrame = {
- require(sampleRatio <= 1.0 && sampleRatio >= 0, "sampleRatio should be between 0 and 1")
-
- val session = if (sparkSession != null) sparkSession else SparkSession.builder().getOrCreate
- val partitions =
- if (numPartitions > 0) {
- numPartitions
- } else {
- session.sparkContext.defaultParallelism
- }
-
- RecursiveFlag.withRecursiveFlag(recursive, session) {
- SamplePathFilter.withPathFilter(sampleRatio, session, seed) {
- val binResult = session.sparkContext.binaryFiles(path, partitions)
- val streams = if (numPartitions == -1) binResult else binResult.repartition(partitions)
- val convert = (origin: String, bytes: PortableDataStream) =>
- decode(origin, bytes.toArray())
- val images = if (dropImageFailures) {
- streams.flatMap { case (origin, bytes) => convert(origin, bytes) }
- } else {
- streams.map { case (origin, bytes) =>
- convert(origin, bytes).getOrElse(invalidImageRow(origin))
- }
- }
- session.createDataFrame(images, imageSchema)
- }
- }
- }
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala
deleted file mode 100644
index e16ec906..0000000
--- a/mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ml.image
-
-import java.nio.file.Paths
-import java.util.Arrays
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.image.ImageSchema._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.types._
-
-class ImageSchemaSuite extends SparkFunSuite with MLlibTestSparkContext {
- // Single column of images named "image"
- private lazy val imagePath = "../data/mllib/images/origin"
-
- test("Smoke test: create basic ImageSchema dataframe") {
- val origin = "path"
- val width = 1
- val height = 1
- val nChannels = 3
- val data = Array[Byte](0, 0, 0)
- val mode = ocvTypes("CV_8UC3")
-
- // Internal Row corresponds to image StructType
- val rows = Seq(Row(Row(origin, height, width, nChannels, mode, data)),
- Row(Row(null, height, width, nChannels, mode, data)))
- val rdd = sc.makeRDD(rows)
- val df = spark.createDataFrame(rdd, ImageSchema.imageSchema)
-
- assert(df.count === 2, "incorrect image count")
- assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema")
- }
-
- test("readImages count test") {
- var df = readImages(imagePath)
- assert(df.count === 1)
-
- df = readImages(imagePath, null, true, -1, false, 1.0, 0)
- assert(df.count === 10)
-
- df = readImages(imagePath, null, true, -1, true, 1.0, 0)
- val countTotal = df.count
- assert(countTotal === 8)
-
- df = readImages(imagePath, null, true, -1, true, 0.5, 0)
- // Random number about half of the size of the original dataset
- val count50 = df.count
- assert(count50 > 0 && count50 < countTotal)
- }
-
- test("readImages test: recursive = false") {
- val df = readImages(imagePath, null, false, 3, true, 1.0, 0)
- assert(df.count() === 0)
- }
-
- test("readImages test: read jpg image") {
- val df = readImages(imagePath + "/kittens/DP153539.jpg", null, false, 3, true, 1.0, 0)
- assert(df.count() === 1)
- }
-
- test("readImages test: read png image") {
- val df = readImages(imagePath + "/multi-channel/BGRA.png", null, false, 3, true, 1.0, 0)
- assert(df.count() === 1)
- }
-
- test("readImages test: read non image") {
- val df = readImages(imagePath + "/kittens/not-image.txt", null, false, 3, true, 1.0, 0)
- assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema")
- assert(df.count() === 0)
- }
-
- test("readImages test: read non image and dropImageFailures is false") {
- val df = readImages(imagePath + "/kittens/not-image.txt", null, false, 3, false, 1.0, 0)
- assert(df.count() === 1)
- }
-
- test("readImages test: sampleRatio > 1") {
- val e = intercept[IllegalArgumentException] {
- readImages(imagePath, null, true, 3, true, 1.1, 0)
- }
- assert(e.getMessage.contains("sampleRatio"))
- }
-
- test("readImages test: sampleRatio < 0") {
- val e = intercept[IllegalArgumentException] {
- readImages(imagePath, null, true, 3, true, -0.1, 0)
- }
- assert(e.getMessage.contains("sampleRatio"))
- }
-
- test("readImages test: sampleRatio = 0") {
- val df = readImages(imagePath, null, true, 3, true, 0.0, 0)
- assert(df.count() === 0)
- }
-
- test("readImages test: with sparkSession") {
- val df = readImages(imagePath, sparkSession = spark, true, 3, true, 1.0, 0)
- assert(df.count() === 8)
- }
-
- test("readImages partition test") {
- val df = readImages(imagePath, null, true, 3, true, 1.0, 0)
- assert(df.rdd.getNumPartitions === 3)
- }
-
- test("readImages partition test: < 0") {
- val df = readImages(imagePath, null, true, -3, true, 1.0, 0)
- assert(df.rdd.getNumPartitions === spark.sparkContext.defaultParallelism)
- }
-
- test("readImages partition test: = 0") {
- val df = readImages(imagePath, null, true, 0, true, 1.0, 0)
- assert(df.rdd.getNumPartitions === spark.sparkContext.defaultParallelism)
- }
-
- // Images with the different number of channels
- test("readImages pixel values test") {
-
- val images = readImages(imagePath + "/multi-channel/").collect
-
- images.foreach { rrow =>
- val row = rrow.getAs[Row](0)
- val filename = Paths.get(getOrigin(row)).getFileName().toString()
- if (firstBytes20.contains(filename)) {
- val mode = getMode(row)
- val bytes20 = getData(row).slice(0, 20)
-
- val (expectedMode, expectedBytes) = firstBytes20(filename)
- assert(ocvTypes(expectedMode) === mode, "mode of the image is not read correctly")
- assert(Arrays.equals(expectedBytes, bytes20), "incorrect numeric value for flattened image")
- }
- }
- }
-
- // number of channels and first 20 bytes of OpenCV representation
- // - default representation for 3-channel RGB images is BGR row-wise:
- // (B00, G00, R00, B10, G10, R10, ...)
- // - default representation for 4-channel RGB images is BGRA row-wise:
- // (B00, G00, R00, A00, B10, G10, R10, A10, ...)
- private val firstBytes20 = Map(
- "grayscale.jpg" ->
- (("CV_8UC1", Array[Byte](-2, -33, -61, -60, -59, -59, -64, -59, -66, -67, -73, -73, -62,
- -57, -60, -63, -53, -49, -55, -69))),
- "chr30.4.184.jpg" -> (("CV_8UC3",
- Array[Byte](-9, -3, -1, -43, -32, -28, -75, -60, -57, -78, -59, -56, -74, -59, -57,
- -71, -58, -56, -73, -64))),
- "BGRA.png" -> (("CV_8UC4",
- Array[Byte](-128, -128, -8, -1, -128, -128, -8, -1, -128,
- -128, -8, -1, 127, 127, -9, -1, 127, 127, -9, -1))),
- "BGRA_alpha_60.png" -> (("CV_8UC4",
- Array[Byte](-128, -128, -8, 60, -128, -128, -8, 60, -128,
- -128, -8, 60, 127, 127, -9, 60, 127, 127, -9, 60)))
- )
-}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
index 38bb246..0ec2747 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
@@ -32,6 +32,24 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {
private lazy val imagePath = "../data/mllib/images/partitioned"
private lazy val recursiveImagePath = "../data/mllib/images"
+ test("Smoke test: create basic ImageSchema dataframe") {
+ val origin = "path"
+ val width = 1
+ val height = 1
+ val nChannels = 3
+ val data = Array[Byte](0, 0, 0)
+ val mode = ocvTypes("CV_8UC3")
+
+ // Internal Row corresponds to image StructType
+ val rows = Seq(Row(Row(origin, height, width, nChannels, mode, data)),
+ Row(Row(null, height, width, nChannels, mode, data)))
+ val rdd = sc.makeRDD(rows)
+ val df = spark.createDataFrame(rdd, imageSchema)
+
+ assert(df.count === 2, "incorrect image count")
+ assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema")
+ }
+
test("image datasource count test") {
val df1 = spark.read.format("image").load(imagePath)
assert(df1.count === 9)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 51d5861..4afca5a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -380,7 +380,11 @@ object MimaExcludes {
// [SPARK-28556][SQL] QueryExecutionListener should also notify Error
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
- ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure")
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
+
+ // [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages")
)
// Exclude rules for 2.4.x
diff --git a/python/pyspark/ml/image.py b/python/pyspark/ml/image.py
index a1aacea..4fb1036 100644
--- a/python/pyspark/ml/image.py
+++ b/python/pyspark/ml/image.py
@@ -203,44 +203,6 @@ class _ImageSchema(object):
return _create_row(self.imageFields,
[origin, height, width, nChannels, mode, data])
- def readImages(self, path, recursive=False, numPartitions=-1,
- dropImageFailures=False, sampleRatio=1.0, seed=0):
- """
- Reads the directory of images from the local or remote source.
-
- .. note:: If multiple jobs are run in parallel with different sampleRatio or recursive flag,
- there may be a race condition where one job overwrites the hadoop configs of another.
-
- .. note:: If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
- potentially non-deterministic.
-
- .. note:: Deprecated in 2.4.0. Use `spark.read.format("image").load(path)` instead and
- this `readImages` will be removed in 3.0.0.
-
- :param str path: Path to the image directory.
- :param bool recursive: Recursive search flag.
- :param int numPartitions: Number of DataFrame partitions.
- :param bool dropImageFailures: Drop the files that are not valid images.
- :param float sampleRatio: Fraction of the images loaded.
- :param int seed: Random number seed.
- :return: a :class:`DataFrame` with a single column of "images",
- see ImageSchema for details.
-
- >>> df = ImageSchema.readImages('data/mllib/images/origin/kittens', recursive=True)
- >>> df.count()
- 5
-
- .. versionadded:: 2.3.0
- """
- warnings.warn("`ImageSchema.readImage` is deprecated. " +
- "Use `spark.read.format(\"image\").load(path)` instead.", DeprecationWarning)
- spark = SparkSession.builder.getOrCreate()
- image_schema = spark._jvm.org.apache.spark.ml.image.ImageSchema
- jsession = spark._jsparkSession
- jresult = image_schema.readImages(path, jsession, recursive, numPartitions,
- dropImageFailures, float(sampleRatio), seed)
- return DataFrame(jresult, spark._wrapped)
-
ImageSchema = _ImageSchema()
diff --git a/python/pyspark/ml/tests/test_image.py b/python/pyspark/ml/tests/test_image.py
index 95efa73..0008b0b 100644
--- a/python/pyspark/ml/tests/test_image.py
+++ b/python/pyspark/ml/tests/test_image.py
@@ -24,18 +24,24 @@ from pyspark.sql import HiveContext, Row
from pyspark.testing.utils import QuietTest
-class ImageReaderTest(SparkSessionTestCase):
+class ImageFileFormatTest(SparkSessionTestCase):
def test_read_images(self):
data_path = 'data/mllib/images/origin/kittens'
- df = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True)
+ df = self.spark.read.format("image") \
+ .option("dropInvalid", True) \
+ .option("recursiveFileLookup", True) \
+ .load(data_path)
self.assertEqual(df.count(), 4)
first_row = df.take(1)[0][0]
+ # compare `schema.simpleString()` instead of directly compare schema,
+ # because the df loaded from datasouce may change schema column nullability.
+ self.assertEqual(df.schema.simpleString(), ImageSchema.imageSchema.simpleString())
+ self.assertEqual(df.schema["image"].dataType.simpleString(),
+ ImageSchema.columnSchema.simpleString())
array = ImageSchema.toNDArray(first_row)
self.assertEqual(len(array), first_row[1])
self.assertEqual(ImageSchema.toImage(array, origin=first_row[0]), first_row)
- self.assertEqual(df.schema, ImageSchema.imageSchema)
- self.assertEqual(df.schema["image"].dataType, ImageSchema.columnSchema)
expected = {'CV_8UC3': 16, 'Undefined': -1, 'CV_8U': 0, 'CV_8UC1': 0, 'CV_8UC4': 24}
self.assertEqual(ImageSchema.ocvTypes, expected)
expected = ['origin', 'height', 'width', 'nChannels', 'mode', 'data']
@@ -61,11 +67,11 @@ class ImageReaderTest(SparkSessionTestCase):
lambda: ImageSchema.toImage("a"))
-class ImageReaderTest2(PySparkTestCase):
+class ImageFileFormatOnHiveContextTest(PySparkTestCase):
@classmethod
def setUpClass(cls):
- super(ImageReaderTest2, cls).setUpClass()
+ super(ImageFileFormatOnHiveContextTest, cls).setUpClass()
cls.hive_available = True
# Note that here we enable Hive's support.
cls.spark = None
@@ -86,17 +92,20 @@ class ImageReaderTest2(PySparkTestCase):
@classmethod
def tearDownClass(cls):
- super(ImageReaderTest2, cls).tearDownClass()
+ super(ImageFileFormatOnHiveContextTest, cls).tearDownClass()
if cls.spark is not None:
cls.spark.sparkSession.stop()
cls.spark = None
def test_read_images_multiple_times(self):
- # This test case is to check if `ImageSchema.readImages` tries to
+ # This test case is to check if ImageFileFormat tries to
# initiate Hive client multiple times. See SPARK-22651.
data_path = 'data/mllib/images/origin/kittens'
- ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True)
- ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True)
+ for i in range(2):
+ self.spark.read.format("image") \
+ .option("dropInvalid", True) \
+ .option("recursiveFileLookup", True) \
+ .load(data_path)
if __name__ == "__main__":
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org