You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/12 09:42:28 UTC

[GitHub] [spark] WeichenXu123 commented on a change in pull request #30009: [SPARK-32907][ML] adaptively blockify instances - LinearSVC

WeichenXu123 commented on a change in pull request #30009:
URL: https://github.com/apache/spark/pull/30009#discussion_r503159247



##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -100,6 +102,23 @@ private[spark] case class InstanceBlock(
 
 private[spark] object InstanceBlock {
 
+  private def getBlockSize(
+      numCols: Long,
+      numRows: Long,
+      nnz: Long,
+      allUnitWeight: Boolean): Long = {
+    val doubleBytes = java.lang.Double.BYTES
+    val arrayHeader = 12L
+    val denseSize = Matrices.getDenseSize(numCols, numRows)
+    val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
+    val matrixSize = math.min(denseSize, sparseSize)
+    if (allUnitWeight) {
+      matrixSize + doubleBytes * numRows + arrayHeader * 2

Review comment:
       should be + 1x arrayHeader ?

##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,65 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
     instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemoryUsage(
+      iterator: Iterator[Instance],
+      maxMemoryUsage: Long): Iterator[InstanceBlock] = {
+    require(maxMemoryUsage > 0)
+    val buff = mutable.ArrayBuilder.make[Instance]
+    var numCols = -1L
+    var count = 0L
+    var nnz = 0L

Review comment:
       nnz => buffNnz

##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,65 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
     instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemoryUsage(
+      iterator: Iterator[Instance],
+      maxMemoryUsage: Long): Iterator[InstanceBlock] = {
+    require(maxMemoryUsage > 0)
+    val buff = mutable.ArrayBuilder.make[Instance]
+    var numCols = -1L
+    var count = 0L
+    var nnz = 0L
+    var allUnitWeight = true
+
+    iterator.flatMap { instance =>
+      if (numCols < 0L) numCols = instance.features.size
+      require(numCols == instance.features.size)
+      val n = instance.features.numNonzeros
+      var block = Option.empty[InstanceBlock]
+      // Check if enough memory remains to add this instance to the block.
+      if (getBlockSize(numCols, count + 1L, nnz + n,
+        allUnitWeight && (instance.weight == 1)) > maxMemoryUsage) {
+        // Check if this instance is too large
+        require(count > 0, s"instance $instance exceeds memory limit $maxMemoryUsage, " +
+          s"please increase block size")
+
+        block = Some(InstanceBlock.fromInstances(buff.result()))
+        buff.clear()
+        count = 0L
+        nnz = 0L
+        allUnitWeight = true
+      }
+      buff += instance
+      count += 1L
+      nnz += n
+      allUnitWeight &&= (instance.weight == 1)
+      block.iterator
+    } ++ {
+      val instances = buff.result()
+      if (instances.nonEmpty) {
+        Iterator.single(InstanceBlock.fromInstances(instances))
+      } else Iterator.empty
+    }
+  }

Review comment:
       This iterator logic here we'd better use for loop with `yield`, it will be more clear to read.

##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -114,6 +133,65 @@ private[spark] object InstanceBlock {
   def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
     instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
   }
+
+  def blokifyWithMaxMemoryUsage(
+      iterator: Iterator[Instance],
+      maxMemoryUsage: Long): Iterator[InstanceBlock] = {
+    require(maxMemoryUsage > 0)
+    val buff = mutable.ArrayBuilder.make[Instance]
+    var numCols = -1L
+    var count = 0L
+    var nnz = 0L
+    var allUnitWeight = true
+
+    iterator.flatMap { instance =>
+      if (numCols < 0L) numCols = instance.features.size
+      require(numCols == instance.features.size)
+      val n = instance.features.numNonzeros

Review comment:
       n => nnz

##########
File path: mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
##########
@@ -100,6 +102,23 @@ private[spark] case class InstanceBlock(
 
 private[spark] object InstanceBlock {
 
+  private def getBlockSize(

Review comment:
       to be semantic accurate, rename to getBlockMemUsage




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org