You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/07/31 05:26:51 UTC

[spark] branch master updated: [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a745381  [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
a745381 is described below

commit a745381b9d3dd290057ef3089de7fdb9264f1f8b
Author: WeichenXu <we...@databricks.com>
AuthorDate: Wed Jul 31 14:26:18 2019 +0900

    [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
    
    ## What changes were proposed in this pull request?
    
    I remove the deprecate `ImageSchema.readImages`.
    Move some useful methods from class `ImageSchema` into class `ImageFileFormat`.
    
    In pyspark, I rename `ImageSchema` class to be `ImageUtils`, and keep some useful python methods in it.
    
    ## How was this patch tested?
    
    UT.
    
    Please review https://spark.apache.org/contributing.html before opening a pull request.
    
    Closes #25245 from WeichenXu123/remove_image_schema.
    
    Authored-by: WeichenXu <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/ml/image/ImageSchema.scala    |  72 ---------
 .../apache/spark/ml/image/ImageSchemaSuite.scala   | 171 ---------------------
 .../ml/source/image/ImageFileFormatSuite.scala     |  18 +++
 project/MimaExcludes.scala                         |   6 +-
 python/pyspark/ml/image.py                         |  38 -----
 python/pyspark/ml/tests/test_image.py              |  29 ++--
 6 files changed, 42 insertions(+), 292 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala b/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala
index a7ddf2f..0313626 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala
@@ -191,76 +191,4 @@ object ImageSchema {
       Some(Row(Row(origin, height, width, nChannels, mode, decoded)))
     }
   }
-
-  /**
-   * Read the directory of images from the local or remote source
-   *
-   * @note If multiple jobs are run in parallel with different sampleRatio or recursive flag,
-   * there may be a race condition where one job overwrites the hadoop configs of another.
-   * @note If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
-   * potentially non-deterministic.
-   *
-   * @param path Path to the image directory
-   * @return DataFrame with a single column "image" of images;
-   *         see ImageSchema for the details
-   */
-  @deprecated("use `spark.read.format(\"image\").load(path)` and this `readImages` will be " +
-    "removed in 3.0.0.", "2.4.0")
-  def readImages(path: String): DataFrame = readImages(path, null, false, -1, false, 1.0, 0)
-
-  /**
-   * Read the directory of images from the local or remote source
-   *
-   * @note If multiple jobs are run in parallel with different sampleRatio or recursive flag,
-   * there may be a race condition where one job overwrites the hadoop configs of another.
-   * @note If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
-   * potentially non-deterministic.
-   *
-   * @param path Path to the image directory
-   * @param sparkSession Spark Session, if omitted gets or creates the session
-   * @param recursive Recursive path search flag
-   * @param numPartitions Number of the DataFrame partitions,
-   *                      if omitted uses defaultParallelism instead
-   * @param dropImageFailures Drop the files that are not valid images from the result
-   * @param sampleRatio Fraction of the files loaded
-   * @return DataFrame with a single column "image" of images;
-   *         see ImageSchema for the details
-   */
-  @deprecated("use `spark.read.format(\"image\").load(path)` and this `readImages` will be " +
-    "removed in 3.0.0.", "2.4.0")
-  def readImages(
-      path: String,
-      sparkSession: SparkSession,
-      recursive: Boolean,
-      numPartitions: Int,
-      dropImageFailures: Boolean,
-      sampleRatio: Double,
-      seed: Long): DataFrame = {
-    require(sampleRatio <= 1.0 && sampleRatio >= 0, "sampleRatio should be between 0 and 1")
-
-    val session = if (sparkSession != null) sparkSession else SparkSession.builder().getOrCreate
-    val partitions =
-      if (numPartitions > 0) {
-        numPartitions
-      } else {
-        session.sparkContext.defaultParallelism
-      }
-
-    RecursiveFlag.withRecursiveFlag(recursive, session) {
-      SamplePathFilter.withPathFilter(sampleRatio, session, seed) {
-        val binResult = session.sparkContext.binaryFiles(path, partitions)
-        val streams = if (numPartitions == -1) binResult else binResult.repartition(partitions)
-        val convert = (origin: String, bytes: PortableDataStream) =>
-          decode(origin, bytes.toArray())
-        val images = if (dropImageFailures) {
-          streams.flatMap { case (origin, bytes) => convert(origin, bytes) }
-        } else {
-          streams.map { case (origin, bytes) =>
-            convert(origin, bytes).getOrElse(invalidImageRow(origin))
-          }
-        }
-        session.createDataFrame(images, imageSchema)
-      }
-    }
-  }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala
deleted file mode 100644
index e16ec906..0000000
--- a/mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ml.image
-
-import java.nio.file.Paths
-import java.util.Arrays
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.image.ImageSchema._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.types._
-
-class ImageSchemaSuite extends SparkFunSuite with MLlibTestSparkContext {
-  // Single column of images named "image"
-  private lazy val imagePath = "../data/mllib/images/origin"
-
-  test("Smoke test: create basic ImageSchema dataframe") {
-    val origin = "path"
-    val width = 1
-    val height = 1
-    val nChannels = 3
-    val data = Array[Byte](0, 0, 0)
-    val mode = ocvTypes("CV_8UC3")
-
-    // Internal Row corresponds to image StructType
-    val rows = Seq(Row(Row(origin, height, width, nChannels, mode, data)),
-      Row(Row(null, height, width, nChannels, mode, data)))
-    val rdd = sc.makeRDD(rows)
-    val df = spark.createDataFrame(rdd, ImageSchema.imageSchema)
-
-    assert(df.count === 2, "incorrect image count")
-    assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema")
-  }
-
-  test("readImages count test") {
-    var df = readImages(imagePath)
-    assert(df.count === 1)
-
-    df = readImages(imagePath, null, true, -1, false, 1.0, 0)
-    assert(df.count === 10)
-
-    df = readImages(imagePath, null, true, -1, true, 1.0, 0)
-    val countTotal = df.count
-    assert(countTotal === 8)
-
-    df = readImages(imagePath, null, true, -1, true, 0.5, 0)
-    // Random number about half of the size of the original dataset
-    val count50 = df.count
-    assert(count50 > 0 && count50 < countTotal)
-  }
-
-  test("readImages test: recursive = false") {
-    val df = readImages(imagePath, null, false, 3, true, 1.0, 0)
-    assert(df.count() === 0)
-  }
-
-  test("readImages test: read jpg image") {
-    val df = readImages(imagePath + "/kittens/DP153539.jpg", null, false, 3, true, 1.0, 0)
-    assert(df.count() === 1)
-  }
-
-  test("readImages test: read png image") {
-    val df = readImages(imagePath + "/multi-channel/BGRA.png", null, false, 3, true, 1.0, 0)
-    assert(df.count() === 1)
-  }
-
-  test("readImages test: read non image") {
-    val df = readImages(imagePath + "/kittens/not-image.txt", null, false, 3, true, 1.0, 0)
-    assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema")
-    assert(df.count() === 0)
-  }
-
-  test("readImages test: read non image and dropImageFailures is false") {
-    val df = readImages(imagePath + "/kittens/not-image.txt", null, false, 3, false, 1.0, 0)
-    assert(df.count() === 1)
-  }
-
-  test("readImages test: sampleRatio > 1") {
-    val e = intercept[IllegalArgumentException] {
-      readImages(imagePath, null, true, 3, true, 1.1, 0)
-    }
-    assert(e.getMessage.contains("sampleRatio"))
-  }
-
-  test("readImages test: sampleRatio < 0") {
-    val e = intercept[IllegalArgumentException] {
-      readImages(imagePath, null, true, 3, true, -0.1, 0)
-    }
-    assert(e.getMessage.contains("sampleRatio"))
-  }
-
-  test("readImages test: sampleRatio = 0") {
-    val df = readImages(imagePath, null, true, 3, true, 0.0, 0)
-    assert(df.count() === 0)
-  }
-
-  test("readImages test: with sparkSession") {
-    val df = readImages(imagePath, sparkSession = spark, true, 3, true, 1.0, 0)
-    assert(df.count() === 8)
-  }
-
-  test("readImages partition test") {
-    val df = readImages(imagePath, null, true, 3, true, 1.0, 0)
-    assert(df.rdd.getNumPartitions === 3)
-  }
-
-  test("readImages partition test: < 0") {
-    val df = readImages(imagePath, null, true, -3, true, 1.0, 0)
-    assert(df.rdd.getNumPartitions === spark.sparkContext.defaultParallelism)
-  }
-
-  test("readImages partition test: = 0") {
-    val df = readImages(imagePath, null, true, 0, true, 1.0, 0)
-    assert(df.rdd.getNumPartitions === spark.sparkContext.defaultParallelism)
-  }
-
-  // Images with the different number of channels
-  test("readImages pixel values test") {
-
-    val images = readImages(imagePath + "/multi-channel/").collect
-
-    images.foreach { rrow =>
-      val row = rrow.getAs[Row](0)
-      val filename = Paths.get(getOrigin(row)).getFileName().toString()
-      if (firstBytes20.contains(filename)) {
-        val mode = getMode(row)
-        val bytes20 = getData(row).slice(0, 20)
-
-        val (expectedMode, expectedBytes) = firstBytes20(filename)
-        assert(ocvTypes(expectedMode) === mode, "mode of the image is not read correctly")
-        assert(Arrays.equals(expectedBytes, bytes20), "incorrect numeric value for flattened image")
-      }
-    }
-  }
-
-  // number of channels and first 20 bytes of OpenCV representation
-  // - default representation for 3-channel RGB images is BGR row-wise:
-  //   (B00, G00, R00,      B10, G10, R10,      ...)
-  // - default representation for 4-channel RGB images is BGRA row-wise:
-  //   (B00, G00, R00, A00, B10, G10, R10, A10, ...)
-  private val firstBytes20 = Map(
-    "grayscale.jpg" ->
-      (("CV_8UC1", Array[Byte](-2, -33, -61, -60, -59, -59, -64, -59, -66, -67, -73, -73, -62,
-        -57, -60, -63, -53, -49, -55, -69))),
-    "chr30.4.184.jpg" -> (("CV_8UC3",
-      Array[Byte](-9, -3, -1, -43, -32, -28, -75, -60, -57, -78, -59, -56, -74, -59, -57,
-        -71, -58, -56, -73, -64))),
-    "BGRA.png" -> (("CV_8UC4",
-      Array[Byte](-128, -128, -8, -1, -128, -128, -8, -1, -128,
-        -128, -8, -1, 127, 127, -9, -1, 127, 127, -9, -1))),
-    "BGRA_alpha_60.png" -> (("CV_8UC4",
-      Array[Byte](-128, -128, -8, 60, -128, -128, -8, 60, -128,
-        -128, -8, 60, 127, 127, -9, 60, 127, 127, -9, 60)))
-  )
-}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
index 38bb246..0ec2747 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
@@ -32,6 +32,24 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {
   private lazy val imagePath = "../data/mllib/images/partitioned"
   private lazy val recursiveImagePath = "../data/mllib/images"
 
+  test("Smoke test: create basic ImageSchema dataframe") {
+    val origin = "path"
+    val width = 1
+    val height = 1
+    val nChannels = 3
+    val data = Array[Byte](0, 0, 0)
+    val mode = ocvTypes("CV_8UC3")
+
+    // Internal Row corresponds to image StructType
+    val rows = Seq(Row(Row(origin, height, width, nChannels, mode, data)),
+      Row(Row(null, height, width, nChannels, mode, data)))
+    val rdd = sc.makeRDD(rows)
+    val df = spark.createDataFrame(rdd, imageSchema)
+
+    assert(df.count === 2, "incorrect image count")
+    assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema")
+  }
+
   test("image datasource count test") {
     val df1 = spark.read.format("image").load(imagePath)
     assert(df1.count === 9)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 51d5861..4afca5a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -380,7 +380,11 @@ object MimaExcludes {
 
     // [SPARK-28556][SQL] QueryExecutionListener should also notify Error
     ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
-    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure")
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
+
+    // [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages")
   )
 
   // Exclude rules for 2.4.x
diff --git a/python/pyspark/ml/image.py b/python/pyspark/ml/image.py
index a1aacea..4fb1036 100644
--- a/python/pyspark/ml/image.py
+++ b/python/pyspark/ml/image.py
@@ -203,44 +203,6 @@ class _ImageSchema(object):
         return _create_row(self.imageFields,
                            [origin, height, width, nChannels, mode, data])
 
-    def readImages(self, path, recursive=False, numPartitions=-1,
-                   dropImageFailures=False, sampleRatio=1.0, seed=0):
-        """
-        Reads the directory of images from the local or remote source.
-
-        .. note:: If multiple jobs are run in parallel with different sampleRatio or recursive flag,
-            there may be a race condition where one job overwrites the hadoop configs of another.
-
-        .. note:: If sample ratio is less than 1, sampling uses a PathFilter that is efficient but
-            potentially non-deterministic.
-
-        .. note:: Deprecated in 2.4.0. Use `spark.read.format("image").load(path)` instead and
-            this `readImages` will be removed in 3.0.0.
-
-        :param str path: Path to the image directory.
-        :param bool recursive: Recursive search flag.
-        :param int numPartitions: Number of DataFrame partitions.
-        :param bool dropImageFailures: Drop the files that are not valid images.
-        :param float sampleRatio: Fraction of the images loaded.
-        :param int seed: Random number seed.
-        :return: a :class:`DataFrame` with a single column of "images",
-               see ImageSchema for details.
-
-        >>> df = ImageSchema.readImages('data/mllib/images/origin/kittens', recursive=True)
-        >>> df.count()
-        5
-
-        .. versionadded:: 2.3.0
-        """
-        warnings.warn("`ImageSchema.readImage` is deprecated. " +
-                      "Use `spark.read.format(\"image\").load(path)` instead.", DeprecationWarning)
-        spark = SparkSession.builder.getOrCreate()
-        image_schema = spark._jvm.org.apache.spark.ml.image.ImageSchema
-        jsession = spark._jsparkSession
-        jresult = image_schema.readImages(path, jsession, recursive, numPartitions,
-                                          dropImageFailures, float(sampleRatio), seed)
-        return DataFrame(jresult, spark._wrapped)
-
 
 ImageSchema = _ImageSchema()
 
diff --git a/python/pyspark/ml/tests/test_image.py b/python/pyspark/ml/tests/test_image.py
index 95efa73..0008b0b 100644
--- a/python/pyspark/ml/tests/test_image.py
+++ b/python/pyspark/ml/tests/test_image.py
@@ -24,18 +24,24 @@ from pyspark.sql import HiveContext, Row
 from pyspark.testing.utils import QuietTest
 
 
-class ImageReaderTest(SparkSessionTestCase):
+class ImageFileFormatTest(SparkSessionTestCase):
 
     def test_read_images(self):
         data_path = 'data/mllib/images/origin/kittens'
-        df = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True)
+        df = self.spark.read.format("image") \
+            .option("dropInvalid", True) \
+            .option("recursiveFileLookup", True) \
+            .load(data_path)
         self.assertEqual(df.count(), 4)
         first_row = df.take(1)[0][0]
+        # compare `schema.simpleString()` instead of directly compare schema,
+        # because the df loaded from datasouce may change schema column nullability.
+        self.assertEqual(df.schema.simpleString(), ImageSchema.imageSchema.simpleString())
+        self.assertEqual(df.schema["image"].dataType.simpleString(),
+                         ImageSchema.columnSchema.simpleString())
         array = ImageSchema.toNDArray(first_row)
         self.assertEqual(len(array), first_row[1])
         self.assertEqual(ImageSchema.toImage(array, origin=first_row[0]), first_row)
-        self.assertEqual(df.schema, ImageSchema.imageSchema)
-        self.assertEqual(df.schema["image"].dataType, ImageSchema.columnSchema)
         expected = {'CV_8UC3': 16, 'Undefined': -1, 'CV_8U': 0, 'CV_8UC1': 0, 'CV_8UC4': 24}
         self.assertEqual(ImageSchema.ocvTypes, expected)
         expected = ['origin', 'height', 'width', 'nChannels', 'mode', 'data']
@@ -61,11 +67,11 @@ class ImageReaderTest(SparkSessionTestCase):
                 lambda: ImageSchema.toImage("a"))
 
 
-class ImageReaderTest2(PySparkTestCase):
+class ImageFileFormatOnHiveContextTest(PySparkTestCase):
 
     @classmethod
     def setUpClass(cls):
-        super(ImageReaderTest2, cls).setUpClass()
+        super(ImageFileFormatOnHiveContextTest, cls).setUpClass()
         cls.hive_available = True
         # Note that here we enable Hive's support.
         cls.spark = None
@@ -86,17 +92,20 @@ class ImageReaderTest2(PySparkTestCase):
 
     @classmethod
     def tearDownClass(cls):
-        super(ImageReaderTest2, cls).tearDownClass()
+        super(ImageFileFormatOnHiveContextTest, cls).tearDownClass()
         if cls.spark is not None:
             cls.spark.sparkSession.stop()
             cls.spark = None
 
     def test_read_images_multiple_times(self):
-        # This test case is to check if `ImageSchema.readImages` tries to
+        # This test case is to check if ImageFileFormat tries to
         # initiate Hive client multiple times. See SPARK-22651.
         data_path = 'data/mllib/images/origin/kittens'
-        ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True)
-        ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True)
+        for i in range(2):
+            self.spark.read.format("image") \
+                .option("dropInvalid", True) \
+                .option("recursiveFileLookup", True) \
+                .load(data_path)
 
 
 if __name__ == "__main__":


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org