You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by codedeft <gi...@git.apache.org> on 2014/10/21 04:02:47 UTC

[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

GitHub user codedeft opened a pull request:

    https://github.com/apache/spark/pull/2868

    [SPARK-3161][MLLIB] Adding a node Id caching mechanism for training deci...

    ...sion trees. @jkbradley @mengxr @chouquin Please review this.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/AlpineNow/spark SPARK-3161

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2868
    
----
commit 9ea76df661a93b1ebdf5ce5a764c7549b2fcbfd0
Author: Sung Chung <sc...@alpinenow.com>
Date:   2014-10-21T01:49:43Z

    [SPARK-3161][MLLIB] Adding a node Id caching mechanism for training decision trees.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61026533
  
    I've been doing some larger dataset (8 million rows with 784 features) testing on node Id cache and I don't think that node Id cache will do much for shallow trees. I'm trying to see where the 'sweet spot' is, but it may have to be well beyond depth 10 for node Id cache to be useful.
    
    Anyhow, it's taking an extremely long time to train to begin with for these big trees with only around 20 executors. I actually gave up to train upto 30 depth level because it was taking upward of 8+ hours to train 100 trees. So the local sub-tree training is really essential here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61333710
  
    LGTM  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19195610
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -515,6 +523,34 @@ object DecisionTree extends Serializable with Logging {
         }
     
         /**
    +     * Do the same thing as bingSeqOp, but with nodeIdCache.
    +     */
    +    def binSeqOpWithNodeIdCache(
    +        agg: Array[DTStatsAggregator],
    +        dataPoint: (BaggedPoint[TreePoint], Array[Int])): Array[DTStatsAggregator] = {
    +      treeToNodeToIndexInfo.foreach { case (treeIndex, nodeIndexToInfo) =>
    +        val baggedPoint = dataPoint._1
    +        val nodeIdCache = dataPoint._2
    +        val nodeIndex = nodeIdCache(treeIndex)
    +        val nodeInfo = nodeIndexToInfo.getOrElse(nodeIndex, null)
    +        // We are processing this point only if it's in the nodeIndexToInfo map.
    --- End diff --
    
    Will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61188396
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22565/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61167709
  
    I ran some local tests but did not see any speedups.  This was trying to mimic your earlier test:
    * original mnist dataset
    * depths 5, 10, 20, and 30
    * 1 compute node (figuring 1 node would make node ID caching most helpful)
    Running times were virtually the same between the master & the master + this PR.  I only tested it with 1 tree and 5 trees, but I would not expect that to make a huge difference from 100 trees.
    
    Thinking about when distributed node ID caching might speed things up, I could imagine it being most helpful with an imbalanced tree, where there was some path which a lot of instances followed.  I.e., the tree gets very deep before we can switch to local training.  This sounds a bit unlikely to me.  However, I would be OK with merging this as protection against such an event, especially since I did not see slowdowns from this PR.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19686166
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala ---
    @@ -102,6 +105,15 @@ object DecisionTreeRunner {
             .text(s"fraction of data to hold out for testing.  If given option testInput, " +
               s"this option is ignored. default: ${defaultParams.fracTest}")
             .action((x, c) => c.copy(fracTest = x))
    +      opt[Boolean]("useNodeIdCache")
    +        .text(s"whether to use node Id cache during training.")
    --- End diff --
    
    Could you please print the default values for the 3 new options?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by chouqin <gi...@git.apache.org>.
Github user chouqin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19132675
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    +   * @param nodeIdUpdaters A map of node index updaters.
    +   *                       The key is the indices of nodes that we want to update.
    +   * @param bins Bin information needed to find child node indices.
    +   */
    +  def updateNodeIndices(
    +      nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
    +      bins: Array[Array[Bin]]): Unit = {
    +    val updatedRDD = data.zip(cur).map {
    +      dataPoint => {
    +        cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
    --- End diff --
    
    Can you use a while loop to do this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19509616
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  var nodeIdsForInstances: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // Keep a reference to a previous node Ids for instances.
    +  // Because we will keep on re-persisting updated node Ids,
    +  // we want to unpersist the previous RDD.
    +  var prevNodeIdsForInstances: RDD[Array[Int]] = null
    --- End diff --
    
    Can this be a temp value within `updateNodeIndices()`?  It could hold a reference to the current RDD until the next RDD is ready and persisted/checkpointed, and then it could be unpersisted.  Currently, it looks like 2 RDDs will be persisted at any given time (rather than just during `updateNodeIndices()`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19508647
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    --- End diff --
    
    Organize imports (this one above the org.apache.spark imports)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19178079
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
         // Finally, only best Splits for nodes are collected to driver to construct decision tree.
         val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
         val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
    -    val nodeToBestSplits =
    +
    +    val partitionAggregates = if (useNodeIdCache) {
    --- End diff --
    
    Also, could the 2 branches (of the ```if (useNodeIdCache)```) call be combined, where you test for ```useNodeIdCache``` only for the 1 line which differs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61173555
  
    I agree it probably only used 2 executors since there were only 2 partitions for the data.  (I think reduceByKey uses the same partitioner by default.)
    
    I think it'd be good to include now, to be used more in the future with other optimizations & for dumping trees to disk, as you point out.
    
    Thanks for running more tests!  Please let me know when it's ready for a final pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61170031
  
    Hm, I see. I'll try testing again on the small mnist but my previous test was on a cluster with 8 executors. However, I realize now that it probably only utilized 2 our of 8 executors (seems like reducebykey that's used doesn't really use extra executors?).
    
    In addition to being useful for deep trees and local-training, I do think that another usefulness of node-Id-cache is if you want to write the model directly to disk without keeping them in memory.
    
    So even if we may not see any performance benefit for now, I do think that we'll need them later. So I guess that the decision is really up to you on whether to include this now or later. It's just that you probably don't want to have it hanging for too long. As long as it gets in soon after release, I'm ok.
    
    I'm currently running deep tree test, albeit with only 10 trees on mnist8m. I'll see if I can see some benefit here at least.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60723953
  
    Updated codes that at every iteration, persist new cache values while unpersisting old values have been submitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59998408
  
    @codedeft  Done with a pass.  It's looking quite good.  My main comments are about code duplication and simplification; I like the general approach.  Let me know when I should make another pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19247637
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    --- End diff --
    
    True, since Node indices are Integers (not Longs), 30+ level trees are a problem.  We could definitely switch to Long at some point---or even eliminate indices, though that might require extra work in some places.  I tried to keep the node indexing logic grouped within Node.scala at least.
    
    My opinion is that the current indexing system is reasonable; if we really support 62+ depth trees, then moving to larger integer types seems OK.  It would be a bit more storage but not that much relative to node size.  Also, at that depth, individual trees would likely need to be distributed anyways (making storage less of an issue) (unless the tree were extremely unbalanced).  What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59877519
  
    Jenkins, add to whitelist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59877812
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21959/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59878898
  
    Seems like lots of line too long messages. Will address this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61359747
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22684/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19510500
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -584,6 +648,13 @@ object DecisionTree extends Serializable with Logging {
     
         timer.stop("chooseSplits")
     
    +    val nodeIdUpdaters = if (nodeIdCache.nonEmpty) {
    +      Array
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60711275
  
      [Test build #22332 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22332/consoleFull) for   PR 2868 at commit [`e08ef62`](https://github.com/apache/spark/commit/e08ef62b7026b342ba5b4623734a0879533a3aec).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19518756
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  var nodeIdsForInstances: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // Keep a reference to a previous node Ids for instances.
    +  // Because we will keep on re-persisting updated node Ids,
    +  // we want to unpersist the previous RDD.
    +  var prevNodeIdsForInstances: RDD[Array[Int]] = null
    +
    +  // To keep track of the past checkpointed RDDs.
    --- End diff --
    
    An alternative solution to checkpointing is to recompute `nodeIds` from scratch every few iterations. It is reliable as long as the driver stays alive. If we do that, users don't need to set checkpoint dir and we don't need to manage checkpoint files. I hope the overhead is not big.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19510120
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19178539
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -629,6 +699,10 @@ object DecisionTree extends Serializable with Logging {
           }
         }
     
    +    if (useNodeIdCache) {
    --- End diff --
    
    Small comment: I wonder if it would be better to check nodeIdCache.nonEmpty instead of using another value useNodeIdCache; it seems less error-prone.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61394403
  
    @codedeft The merge script didn't close this PR automatically. Could you help close it? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft closed the pull request at:

    https://github.com/apache/spark/pull/2868


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60013991
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61359743
  
      [Test build #22684 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22684/consoleFull) for   PR 2868 at commit [`5f5a156`](https://github.com/apache/spark/commit/5f5a1564af1a8a1cbf6d257941ad969169295fe7).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61153266
  
    I got similar results on 16 nodes using MNIST8m; basically no change in runtime (+/- a few percent at most).  But those tests were for shallow trees.  I worry that this patch will only help for depths at which we might as well switch to local training.  In tests with Sequoia Forests, do you know how much distributed node ID caching helps?  I wonder if node ID caching will only be helpful for local training.
    
    Running more tests...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19509769
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    --- End diff --
    
    Can a new method be added to clean up any remaining checkpoint files at the end of training?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19195595
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -584,6 +642,9 @@ object DecisionTree extends Serializable with Logging {
     
         timer.stop("chooseSplits")
     
    +    val nodeIdUpdaters = Array
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19177555
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -515,6 +523,34 @@ object DecisionTree extends Serializable with Logging {
         }
     
         /**
    +     * Do the same thing as bingSeqOp, but with nodeIdCache.
    --- End diff --
    
    "bingSeqOp" --> "binSeqOp"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59879975
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21965/consoleFull) for   PR 2868 at commit [`6b05af0`](https://github.com/apache/spark/commit/6b05af042656b192e7b14954a433a75468df1d1c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61388530
  
    I've merged this into master. Thanks @codedeft adding node id caching, and @chouqin @manishamde @jkbradley for reviewing the code!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60634290
  
    CC: @manishamde If you have time to take a look!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19306308
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    --- End diff --
    
    I think that one good benefit of Decision Tree over NearestNeighbor, even in unpruned state, is the compactness of representation. It's still a lot less storage for models of decision trees, compared to having to store entire datasets.
    
    But yea, it still suffers like a lot of other non-parametric algorithms (e.g. SVM, etc.) whose models grow proportionally to dataset sizes, especially if unpruned.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61037009
  
    I agree local sub-tree training will be needed to train deep trees.  That should probably be the next priority.  I'm running some tests now and will see if I see different speedups (also on mnist8m, though I may try some synthetic datasets later for variety).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60713165
  
    Here's one number. But this requires constant re-caching new node Id caches and unpersisting old node Id caches that is not reflected in the code yet. I'm not sure if frequent persisting of a new RDD from a previously persisted RDD is a cheap operation, but at least in this data set, it seems fast. Let me know if you guys know more about persistence mechanism.
    
    mnist dataset, 750 columns with 60000 rows (only two partitions). 8 executors. 10-class classification. 100 trees trained, 30 max depth. Gini. With the default fraction testing.
    
    Without node-id caching, it took 24 mins 34 seconds.
    With node-id caching with persisting the cache every two iteration, it took 16 minutes 42 seconds.
    
    So we see noticeable benefits, as long as we frequently recache the node Id cache.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61335058
  
    Some sort of YARN failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61336982
  
    Yep, apparently so, but someone's working on fixing it ASAP


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19509528
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  var nodeIdsForInstances: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // Keep a reference to a previous node Ids for instances.
    +  // Because we will keep on re-persisting updated node Ids,
    +  // we want to unpersist the previous RDD.
    +  var prevNodeIdsForInstances: RDD[Array[Int]] = null
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    --- End diff --
    
    Can `checkpointQueue` and `rddUpdateCount` be made private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19195515
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60707307
  
      [Test build #22332 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22332/consoleFull) for   PR 2868 at commit [`e08ef62`](https://github.com/apache/spark/commit/e08ef62b7026b342ba5b4623734a0879533a3aec).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61334809
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22639/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19510115
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61165010
  
    I too hope caching will be useful later on.  One last thing I'm trying is running locally (on a beefier machine than my laptop).  If it helps in local mode, it might be worth keeping, especially since I did not see slow-downs in the distributed setting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19195486
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    +   * @param nodeIdUpdaters A map of node index updaters.
    +   *                       The key is the indices of nodes that we want to update.
    +   * @param bins Bin information needed to find child node indices.
    +   */
    +  def updateNodeIndices(
    +      nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
    +      bins: Array[Array[Bin]]): Unit = {
    +    val updatedRDD = data.zip(cur).map {
    +      dataPoint => {
    +        cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
    --- End diff --
    
    Will do. I used it because spire was included somehow (maybe one of the dependent packages use it).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by chouqin <gi...@git.apache.org>.
Github user chouqin commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59893259
  
    @codedeft Thanks for your nice work. I have added some comments inline. Here are some high level comments:
    
    1. Have you tested the performance after this change?As discussed in [SPARK-3161](https://issues.apache.org/jira/browse/SPARK-3161), This will help little for shallow trees. Then how much performance gain will this change give for deep trees? If it gives much gain, I think we should add more unit test for this option and refactor the code to address code reuse(for example, there are some duplication between `binSeqOp` and `binSeqOpWithNodeIdCache`)
    
    2. Is checkpoint really necessary to avoid long lineage? Maybe my understanding is not right, to my knowledge, each time we do aggregation, the `nodeIdCache` will be computed. If we persist it into disk(using `persist(StorageLevel.MEMORY_AND_DISK)`) and unpersist it if it is not needed anymore, then is will persist in the disk each time it gets computed. We can remove checkpoint and make the code cleaner and faster(checkpoint will persist RDD into a distributed file system).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61375497
  
    It finally finished.
    
    10 Trees, 30 depth limit. mnist8m, 20 executors:
    
    15 hours with node Id cache.
    21 hours without node Id cache.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19195598
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -629,6 +699,10 @@ object DecisionTree extends Serializable with Logging {
           }
         }
     
    +    if (useNodeIdCache) {
    --- End diff --
    
    Yea, maybe it's not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19508498
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -613,6 +684,14 @@ object DecisionTree extends Serializable with Logging {
               node.rightNode = Some(Node(Node.rightChildIndex(nodeIndex),
                 stats.rightPredict, stats.rightImpurity, rightChildIsLeaf))
     
    +          if (nodeIdCache.nonEmpty) {
    +            val nodeIndexUpdater = NodeIndexUpdater(
    +              split = split,
    +              nodeIndex = nodeIndex)
    +            nodeIdUpdaters(treeIndex) =
    +              nodeIdUpdaters(treeIndex) ++ Map(nodeIndex -> nodeIndexUpdater)
    --- End diff --
    
    Have you checked out timing to see if using a Map like this causes issues with garbage collection?  I'm wondering if using something like org.apache.spark.util.collection.OpenHashMap would be more efficient; you could construct the map using an OpenHashMap and then cast it to an immutable map.  I've only tested locally so far, and it does not seem to be an issue.  But we can keep it in mind for distributed tests if we ever see that the internal timing for findBestSplits is significantly different from chooseSplits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59871504
  
    Jenkins, please start the test!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59877810
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21959/consoleFull) for   PR 2868 at commit [`9ea76df`](https://github.com/apache/spark/commit/9ea76df661a93b1ebdf5ce5a764c7549b2fcbfd0).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61335866
  
    Yea, I'm also getting Yarn compilation failure on my machine after doing the latest pull. Is this happening everywhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codeleft <gi...@git.apache.org>.
Github user codeleft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61365784
  
    @manishamde the person you want to respond to is @codedeft. I'm not involved with this project. Our names are close, but off by one letter. Sorry for the intrusion, I'll see myself out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19176539
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    +   * @param nodeIdUpdaters A map of node index updaters.
    +   *                       The key is the indices of nodes that we want to update.
    +   * @param bins Bin information needed to find child node indices.
    +   */
    +  def updateNodeIndices(
    +      nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
    +      bins: Array[Array[Bin]]): Unit = {
    +    val updatedRDD = data.zip(cur).map {
    +      dataPoint => {
    +        cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
    +          treeId => {
    +            val nodeIdUpdater = nodeIdUpdaters(treeId).getOrElse(dataPoint._2(treeId), null)
    +            if (nodeIdUpdater != null) {
    +              val newNodeIndex = nodeIdUpdater.updateNodeIndex(
    +                binnedFeatures = dataPoint._1.datum.binnedFeatures,
    +                bins = bins)
    +              dataPoint._2(treeId) = newNodeIndex
    +            }
    +          }
    +        )
    +
    +        dataPoint._2
    +      }
    +    }
    +
    +    cur = updatedRDD
    +    rddUpdateCount += 1
    +
    +    // Handle checkpointing if the directory is not None.
    +    if (checkpointDir != None && (rddUpdateCount % checkpointInterval) == 0) {
    +      // Let's see if we can unpersist previous checkpoints.
    +      var canUnpersist = true
    +      while (checkpointQueue.size > 1 && canUnpersist) {
    +        // We can unpersist the oldest checkpoint iff
    +        // the next checkpoint actually exists in the file system.
    +        if (checkpointQueue.get(1).get.getCheckpointFile != None) {
    +          val old = checkpointQueue.dequeue()
    +          old.unpersist()
    --- End diff --
    
    To remove a checkpoint, you'll need to remove the checkpoint file manually.  You should be able to do something like:
    ```
    val fs = new Path(checkpointDir).getFileSystem(sparkContext.hadoopConfiguration)
    fs.delete(checkpointFile)
    ```
    after the next checkpoint has finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19510132
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59884734
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21966/consoleFull) for   PR 2868 at commit [`13585e8`](https://github.com/apache/spark/commit/13585e8738e35743c6c0ab482d34552f01939bd4).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)`
      * `  class SerializableMapWrapper[A, B](underlying: collection.Map[A, B])`
      * `  case class ReconnectWorker(masterUrl: String) extends DeployMessage`
      * `class Predict(`
      * `case class EvaluatePython(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by chouqin <gi...@git.apache.org>.
Github user chouqin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19132973
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    --- End diff --
    
    Just a suggestion, we can remove this `data` field from this class because it is only used for  the `updateNodeIndices` function. The name `NodeIdCache` will make more sense in this way.
    We can call `updateNodeIndices` with a `RDD[BaggedPoint[TreePoint]]` explicitly.
    
    By the way, can you give a meaningful name to `cur`( such as `nodeIdsForInstances`)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19513979
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  var nodeIdsForInstances: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // Keep a reference to a previous node Ids for instances.
    +  // Because we will keep on re-persisting updated node Ids,
    +  // we want to unpersist the previous RDD.
    +  var prevNodeIdsForInstances: RDD[Array[Int]] = null
    --- End diff --
    
    take(1) is probably a reasonable thing to do. But a hack nonetheless. Lol


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19177914
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
         // Finally, only best Splits for nodes are collected to driver to construct decision tree.
         val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
         val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
    -    val nodeToBestSplits =
    +
    +    val partitionAggregates = if (useNodeIdCache) {
    --- End diff --
    
    Could you please state the type of partitionAggregates for code clarity?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59884334
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21965/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19510465
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  var nodeIdsForInstances: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // Keep a reference to a previous node Ids for instances.
    +  // Because we will keep on re-persisting updated node Ids,
    +  // we want to unpersist the previous RDD.
    +  var prevNodeIdsForInstances: RDD[Array[Int]] = null
    --- End diff --
    
    That's what I tried first, but then I found out that the RDD doesn't actually get persisted when I call 'persist'. It seems that the actual persistence won't happen until some action method gets called outside of this method. Do you know if there's a way to trigger persistence right there and then? Same with checkpointing, because that would make life much easier from the management perspective (I personally don't like these implicit under-the-hood things very much... ;)).
    
    Additionally, I think that there'll always be some window where you'll have two RDDs persisted since for performance, you want to get the next persisted one from the previously persisted one. So that's one downside of this approach since this will require more memory than direct-tree approach. At least until the trees get really big.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19173742
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    +   * @param nodeIdUpdaters A map of node index updaters.
    +   *                       The key is the indices of nodes that we want to update.
    +   * @param bins Bin information needed to find child node indices.
    +   */
    +  def updateNodeIndices(
    +      nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
    +      bins: Array[Array[Bin]]): Unit = {
    +    val updatedRDD = data.zip(cur).map {
    +      dataPoint => {
    +        cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
    --- End diff --
    
    I second that, especially if it eliminates the dependence on spire (since spire is not used elsewhere in Spark).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59884738
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21966/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by manishamde <gi...@git.apache.org>.
Github user manishamde commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61359008
  
    @codeleft I agree that local training should be a high priority. Just curious -- what's the depth of the tree in the failing case? 
    
    I vote for merging this PR since there is no loss in performance for shallow trees and gain in performance for deep trees. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60721861
  
      [Test build #22351 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22351/consoleFull) for   PR 2868 at commit [`58a7b3e`](https://github.com/apache/spark/commit/58a7b3eaa7a51bfc2da85c7144bd721e5c252acf).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59877748
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21959/consoleFull) for   PR 2868 at commit [`9ea76df`](https://github.com/apache/spark/commit/9ea76df661a93b1ebdf5ce5a764c7549b2fcbfd0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61340121
  
      [Test build #498 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/498/consoleFull) for   PR 2868 at commit [`a078fc8`](https://github.com/apache/spark/commit/a078fc867417c2100e6ea3dd5da19a94ae55fe36).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61189986
  
    Ok, my performance test on the small mnist is still consistent (100 trees, 30 depth limit). I think that the big reason for this is that when it's actually running in a cluster (as opposed to locally), we'll actually be transferring the trees and the actual transfer time can get significant once the model gets larger.
    
    This time the times it took were (there was another heavy workload in the cluster, so overall slower),
    without node id cache : 30 mins 57 seconds
    with node id cache and checkpointing every 10 iterations : 19 mins 10 seconds
    
    This is shown in the time between 'collectAsMap' and 'mapPartitions'. Near the end, we see entries like these without node id cache.
    
    213	mapPartitions at DecisionTree.scala:618 +details   2014/10/30 16:19:56	6 s	
    212	collectAsMap at DecisionTree.scala:647 +details    2014/10/30 16:19:43	    0.2 s
    
    As you can see, although collectAsMap only took 0.2 seconds starting from 16:19:43, the mapPartitions doesn't start until 13 seconds later! So although the actual mapPartitions process took only 6 seconds, the overall time it took was 19 seconds.
    
    Early on, the time inbetween is much smaller:
    
    45	mapPartitions at DecisionTree.scala:618 +details  2014/10/30 15:56:09	5 s	
    44	collectAsMap at DecisionTree.scala:647 +details 2014/10/30 15:56:05	    3 s
    
    In contrast, with node Id cache there's very little time inbetween these two steps either early in the process or later in the process, although in general mapPartitions seems to take a little more time:
    
    44	mapPartitions at DecisionTree.scala:600 +details 2014/10/30 16:28:36	6 s
    43	collectAsMap at DecisionTree.scala:647 +details 2014/10/30 16:28:33	   3 s
    
    212	mapPartitions at DecisionTree.scala:600 +details 2014/10/30 16:41:49	7 s	
    211	collectAsMap at DecisionTree.scala:647 +details 2014/10/30 16:41:46	   2 s
    
    I guess that the reason we don't see too much improvement with larger datasets is that mapPartitions take much longer time, and the additional time it takes to transfer models become comparatively smaller in percentage.
    
    I'm still running the 10 tree mnist8m 30 depth test, been running for 5+ hours.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19247743
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
         // Finally, only best Splits for nodes are collected to driver to construct decision tree.
         val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
         val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
    -    val nodeToBestSplits =
    +
    +    val partitionAggregates = if (useNodeIdCache) {
    +      input.zip(nodeIdCache.get.cur).mapPartitions { points =>
    +        // Construct a nodeStatsAggregators array to hold node aggregate stats,
    +        // each node will have a nodeStatsAggregator
    +        val nodeStatsAggregators = Array.tabulate(numNodes) { nodeIndex =>
    +          val featuresForNode = nodeToFeaturesBc.value.flatMap { nodeToFeatures =>
    +            Some(nodeToFeatures(nodeIndex))
    +          }
    +          new DTStatsAggregator(metadata, featuresForNode)
    +        }
    +
    +        // iterator all instances in current partition and update aggregate stats
    +        points.foreach(binSeqOpWithNodeIdCache(nodeStatsAggregators, _))
    --- End diff --
    
    Good point; ignore my comment please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61188389
  
      [Test build #22565 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22565/consoleFull) for   PR 2868 at commit [`54656c5`](https://github.com/apache/spark/commit/54656c53cd30f243e9a7ff6ad76a2daede3b7aa0).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19512071
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  var nodeIdsForInstances: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // Keep a reference to a previous node Ids for instances.
    +  // Because we will keep on re-persisting updated node Ids,
    +  // we want to unpersist the previous RDD.
    +  var prevNodeIdsForInstances: RDD[Array[Int]] = null
    --- End diff --
    
    @mengxr  Can you weigh in on this (if you know of a good fix)?  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60845877
  
    @codedeft  Thanks for the updates!  I added some small comments above.  Feel free to ignore the OpenHashMap suggestion, unless you find a problem in your tests.  After these small fixes, I think it will be ready.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59877524
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61334801
  
      [Test build #22639 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22639/consoleFull) for   PR 2868 at commit [`a078fc8`](https://github.com/apache/spark/commit/a078fc867417c2100e6ea3dd5da19a94ae55fe36).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19195587
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60729383
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22351/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19510109
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -613,6 +684,14 @@ object DecisionTree extends Serializable with Logging {
               node.rightNode = Some(Node(Node.rightChildIndex(nodeIndex),
                 stats.rightPredict, stats.rightImpurity, rightChildIsLeaf))
     
    +          if (nodeIdCache.nonEmpty) {
    +            val nodeIndexUpdater = NodeIndexUpdater(
    +              split = split,
    +              nodeIndex = nodeIndex)
    +            nodeIdUpdaters(treeIndex) =
    +              nodeIdUpdaters(treeIndex) ++ Map(nodeIndex -> nodeIndexUpdater)
    --- End diff --
    
    This was one of those thoughtless choices because I didn't think that this will have that much of a performance impact. But I could change this more simply to a mutable Map if you'd like. I think it'll be cheaper and maybe slightly faster, particularly for this collection addition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59869002
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60712109
  
    Currently doing some performance testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19509685
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    --- End diff --
    
    Please add ":: DeveloperApi ::" to start of doc here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60040201
  
    Thanks for all the comments guys. I'll address them and resubmit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19510480
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  var nodeIdsForInstances: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // Keep a reference to a previous node Ids for instances.
    +  // Because we will keep on re-persisting updated node Ids,
    +  // we want to unpersist the previous RDD.
    +  var prevNodeIdsForInstances: RDD[Array[Int]] = null
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61190259
  
    I've addressed the comments. Please review at your convenience. I'll publish some big data results once they are actually done.
    
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by manishamde <gi...@git.apache.org>.
Github user manishamde commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61375098
  
    @codeleft I am so sorry. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59880277
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21966/consoleFull) for   PR 2868 at commit [`13585e8`](https://github.com/apache/spark/commit/13585e8738e35743c6c0ab482d34552f01939bd4).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by manishamde <gi...@git.apache.org>.
Github user manishamde commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61156969
  
    @codedeft @jkbradley I have not followed the discussion very closely (apologies!) but at the high level, could we add local training support along with this PR possibly after the 1.2 code freeze. I think there is a lot of effort put into this and it will be a shame to see it wasted especially since it is relevant to local training. I vote for keeping this PR alive and add local training feature to it. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61358798
  
    @mengxr @jkbradley Can you merge this? This is the only way you can effectively train 10 large trees with the mnist8m dataset.
    
    With node Id cache, it took a very long time, but we were able to finish training 10 trees on mnist8m in 15 hours with 20 executors. SF with local training can finish this in 20 minutes, so local training would be a must in the next release.
    
    However, without node Id cache, it looks like it's not even possible. It's currently only 60% of the way there and it's already taken 13 hours and dozens of fetch failures. I feel that it might eventually just fail because the models are just too big to pass around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19195461
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    --- End diff --
    
    Yes, I noticed that you have a rule for determining the node indices through bit shifts. However, I was wondering if this is something that could potentially change in the future, and maybe leave that logic outside.
    
    E.g. this seems to be a primary reason that 30+ level trees can't be trained at the moment and you might want to use a different logic in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61327827
  
      [Test build #22639 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22639/consoleFull) for   PR 2868 at commit [`a078fc8`](https://github.com/apache/spark/commit/a078fc867417c2100e6ea3dd5da19a94ae55fe36).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19518754
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  var nodeIdsForInstances: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // Keep a reference to a previous node Ids for instances.
    +  // Because we will keep on re-persisting updated node Ids,
    +  // we want to unpersist the previous RDD.
    +  var prevNodeIdsForInstances: RDD[Array[Int]] = null
    --- End diff --
    
    The current implementation should be correct. When we compute `nodeIds` (this is where `nodeIds` gets materialized), `prevNodeIds` should be cached. So after the computation, we have both RDDs cached. `count()` doesn't help here. `take(1)` may only cache the first partition.
    
    One thing to watch is the closure size of those RDDs. If we happen to reference lots of stuff, it will increase the storage requirement on the driver, who needs to remember how to re-compute each RDD.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19686161
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        Node.leftChildIndex(nodeIndex)
    +      } else {
    +        Node.rightChildIndex(nodeIndex)
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param nodeIdsForInstances The initial values in the cache
    --- End diff --
    
    A bit unclear; perhaps update to: "For each TreePoint, an array over trees of the node index in each tree.  (Initially, values should all be 1 for root node.)"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19175188
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    --- End diff --
    
    When you rename ```cur``` to ```SOMENAME```, it would be great to rename ```updatedRDD``` as well to ```updatedSOMENAME```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59974518
  
    @chouqin  Checkpointing is helpful since it is more persistent than persist().  Checkpointing stores data to HDFS (with replication), so that the RDD is stored even if a worker dies.  With persist(), part of the RDD will be lost when a worker dies.  For my big decision tree tests, I do see EC2 workers die periodically (though not that often), and I am sure it is a bigger issue for corporate (big) clusters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19195671
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
         // Finally, only best Splits for nodes are collected to driver to construct decision tree.
         val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
         val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
    -    val nodeToBestSplits =
    +
    +    val partitionAggregates = if (useNodeIdCache) {
    +      input.zip(nodeIdCache.get.cur).mapPartitions { points =>
    +        // Construct a nodeStatsAggregators array to hold node aggregate stats,
    +        // each node will have a nodeStatsAggregator
    +        val nodeStatsAggregators = Array.tabulate(numNodes) { nodeIndex =>
    +          val featuresForNode = nodeToFeaturesBc.value.flatMap { nodeToFeatures =>
    +            Some(nodeToFeatures(nodeIndex))
    +          }
    +          new DTStatsAggregator(metadata, featuresForNode)
    +        }
    +
    +        // iterator all instances in current partition and update aggregate stats
    +        points.foreach(binSeqOpWithNodeIdCache(nodeStatsAggregators, _))
    --- End diff --
    
    Well, one requires zip and the other one doesn't,  so fundamentally changes the type of rows.
    
    Additionally, I think if we just branch out within mapPartitions, won't it unnecessarily serialize some things that are not used in one branch and not the other? E.g. it seems that binSeqOp itself becomes object and will be serialized, along with closure items.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61358651
  
      [Test build #22684 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22684/consoleFull) for   PR 2868 at commit [`5f5a156`](https://github.com/apache/spark/commit/5f5a1564af1a8a1cbf6d257941ad969169295fe7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19176858
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    +   * @param nodeIdUpdaters A map of node index updaters.
    +   *                       The key is the indices of nodes that we want to update.
    +   * @param bins Bin information needed to find child node indices.
    +   */
    +  def updateNodeIndices(
    +      nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
    +      bins: Array[Array[Bin]]): Unit = {
    +    val updatedRDD = data.zip(cur).map {
    +      dataPoint => {
    +        cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
    +          treeId => {
    +            val nodeIdUpdater = nodeIdUpdaters(treeId).getOrElse(dataPoint._2(treeId), null)
    +            if (nodeIdUpdater != null) {
    +              val newNodeIndex = nodeIdUpdater.updateNodeIndex(
    +                binnedFeatures = dataPoint._1.datum.binnedFeatures,
    +                bins = bins)
    +              dataPoint._2(treeId) = newNodeIndex
    +            }
    +          }
    +        )
    +
    +        dataPoint._2
    +      }
    +    }
    +
    +    cur = updatedRDD
    +    rddUpdateCount += 1
    +
    +    // Handle checkpointing if the directory is not None.
    +    if (checkpointDir != None && (rddUpdateCount % checkpointInterval) == 0) {
    +      // Let's see if we can unpersist previous checkpoints.
    +      var canUnpersist = true
    +      while (checkpointQueue.size > 1 && canUnpersist) {
    +        // We can unpersist the oldest checkpoint iff
    +        // the next checkpoint actually exists in the file system.
    +        if (checkpointQueue.get(1).get.getCheckpointFile != None) {
    +          val old = checkpointQueue.dequeue()
    +          old.unpersist()
    +        } else {
    +          canUnpersist = false
    +        }
    +      }
    +
    +      cur.persist(StorageLevel.MEMORY_AND_DISK)
    +      cur.checkpoint()
    +      checkpointQueue.enqueue(cur)
    +    }
    +  }
    +}
    +
    +@DeveloperApi
    +private[tree] object NodeIdCache {
    +  /**
    +   * Initialize the node Id cache with initial node Id values.
    +   * @param data The RDD of training rows.
    +   * @param numTrees The number of trees that we want to create cache for.
    +   * @param checkpointDir The checkpoint directory where the checkpointed files will be stored.
    +   * @param checkpointInterval The checkpointing interval
    +   *                           (how often should the cache be checkpointed.).
    +   * @param initVal The inital values in the cache.
    --- End diff --
    
    "inital" ---> "initial"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19178115
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -553,7 +589,26 @@ object DecisionTree extends Serializable with Logging {
         // Finally, only best Splits for nodes are collected to driver to construct decision tree.
         val nodeToFeatures = getNodeToFeatures(treeToNodeToIndexInfo)
         val nodeToFeaturesBc = input.sparkContext.broadcast(nodeToFeatures)
    -    val nodeToBestSplits =
    +
    +    val partitionAggregates = if (useNodeIdCache) {
    +      input.zip(nodeIdCache.get.cur).mapPartitions { points =>
    +        // Construct a nodeStatsAggregators array to hold node aggregate stats,
    +        // each node will have a nodeStatsAggregator
    +        val nodeStatsAggregators = Array.tabulate(numNodes) { nodeIndex =>
    +          val featuresForNode = nodeToFeaturesBc.value.flatMap { nodeToFeatures =>
    +            Some(nodeToFeatures(nodeIndex))
    +          }
    +          new DTStatsAggregator(metadata, featuresForNode)
    +        }
    +
    +        // iterator all instances in current partition and update aggregate stats
    +        points.foreach(binSeqOpWithNodeIdCache(nodeStatsAggregators, _))
    --- End diff --
    
    (from above)  You can just test ```useNodeIdCache``` at this line.  That should reduce code duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19508499
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -584,6 +648,13 @@ object DecisionTree extends Serializable with Logging {
     
         timer.stop("chooseSplits")
     
    +    val nodeIdUpdaters = if (nodeIdCache.nonEmpty) {
    +      Array
    --- End diff --
    
    fits on one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61310862
  
    @codedeft  I added 2 small comments.  Other than that, it LGTM.  Thanks for the PR!
    CC: @mengxr


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59975425
  
    @codedeft  Thanks for the PR!  I'll make a pass now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19174918
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    --- End diff --
    
    It would be cheaper (and maybe easier) to pass an Array[Map[Int, Split]] (replacing NodeIndexUpdater with Split).  You already have that info when you construct nodeIdUpdaters in DecisionTree.scala, and you don't have to explicitly store the left/right child IDs.  You can compute child IDs from the parent ID using Node.leftChildIndex (and Node.rightChildIndex).  Hopefully that simplifies the code some.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61358267
  
    The conflict is caused by the GBoosting check-in. I'm taking a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61179636
  
      [Test build #22565 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22565/consoleFull) for   PR 2868 at commit [`54656c5`](https://github.com/apache/spark/commit/54656c53cd30f243e9a7ff6ad76a2daede3b7aa0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19195544
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    +   * @param nodeIdUpdaters A map of node index updaters.
    +   *                       The key is the indices of nodes that we want to update.
    +   * @param bins Bin information needed to find child node indices.
    +   */
    +  def updateNodeIndices(
    +      nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
    +      bins: Array[Array[Bin]]): Unit = {
    +    val updatedRDD = data.zip(cur).map {
    +      dataPoint => {
    +        cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
    +          treeId => {
    +            val nodeIdUpdater = nodeIdUpdaters(treeId).getOrElse(dataPoint._2(treeId), null)
    +            if (nodeIdUpdater != null) {
    +              val newNodeIndex = nodeIdUpdater.updateNodeIndex(
    +                binnedFeatures = dataPoint._1.datum.binnedFeatures,
    +                bins = bins)
    +              dataPoint._2(treeId) = newNodeIndex
    +            }
    +          }
    +        )
    +
    +        dataPoint._2
    +      }
    +    }
    +
    +    cur = updatedRDD
    +    rddUpdateCount += 1
    +
    +    // Handle checkpointing if the directory is not None.
    +    if (checkpointDir != None && (rddUpdateCount % checkpointInterval) == 0) {
    +      // Let's see if we can unpersist previous checkpoints.
    +      var canUnpersist = true
    +      while (checkpointQueue.size > 1 && canUnpersist) {
    +        // We can unpersist the oldest checkpoint iff
    +        // the next checkpoint actually exists in the file system.
    +        if (checkpointQueue.get(1).get.getCheckpointFile != None) {
    +          val old = checkpointQueue.dequeue()
    +          old.unpersist()
    --- End diff --
    
    I could do this. But then do you really want to manually delete checkpoints here? I was wondering if you guys had any plan on managing this implicitly underneath, since that seems like a more logical place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19178228
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -584,6 +642,9 @@ object DecisionTree extends Serializable with Logging {
     
         timer.stop("chooseSplits")
     
    +    val nodeIdUpdaters = Array
    --- End diff --
    
    Only allocate if useNodeIdCache?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19249614
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    --- End diff --
    
    I think that this is fine as long as you get relatively balanced trees, since the tree would be extremely big by the time you reach level 30.
    
    However, based on my experience, the problem in practice is that you often get unbalanced trees. E.g., when I train on 60,000 sample mnist without pruning, I often got a tree with close to ~100 level deep, even though the number of nodes was only around 5000 or so.
    
    I will do it the way you suggested by simply calling Node.leftChildId, Node.rightChildId.
    
    For future, though, I think that a relatively easy way to do this might be by assigning Ids in a FIFO fashion (keepLastAssignedId and +1 for every child node that comes in). It will likely lead to a bit more complicated code, but I think that it'll be easier than removing node Ids entirely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60729378
  
      [Test build #22351 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22351/consoleFull) for   PR 2868 at commit [`58a7b3e`](https://github.com/apache/spark/commit/58a7b3eaa7a51bfc2da85c7144bd721e5c252acf).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61155725
  
    Yea, I'm trying to run depth 30 tests, but I got failures (both without and with node Id cache) that seem to happen often in our clusters when using TorrentBroadcast. Trying to test again with HttpBroadcast. But anyhow, I have a hard time imagining people training deep trees without local training. So for now, node Id cache seems not very necessary.
    
    I think though that this might be a good addition for adding local training later. Eventually once deep trees become very easy to train, I think passing them back and forth would not be advisable. So this could be a check-in for future preparation. What do you think?
    
    It's hard to compare against Sequoia Forest because SF has been highly optimized in data structures, and currently even without local-training runs about 3 times faster than this (e.g. it took 18 minutes to train 100 trees with depthlimit of 10 on mnist8m without local training whereas DecisionTreeRunner took about an hour).
    
    I think it has a lot to do with a lot of small things (e.g., SF doesn't need to pass back and forth bin information, doesn't use any map structure to prevent auto-boxing and faster lookup, etc.). So I'm not sure if the node Id cache had anything to do with it. These are optimizations we can add later on MLLib as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19177832
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
    @@ -515,6 +523,34 @@ object DecisionTree extends Serializable with Logging {
         }
     
         /**
    +     * Do the same thing as bingSeqOp, but with nodeIdCache.
    +     */
    +    def binSeqOpWithNodeIdCache(
    +        agg: Array[DTStatsAggregator],
    +        dataPoint: (BaggedPoint[TreePoint], Array[Int])): Array[DTStatsAggregator] = {
    +      treeToNodeToIndexInfo.foreach { case (treeIndex, nodeIndexToInfo) =>
    +        val baggedPoint = dataPoint._1
    +        val nodeIdCache = dataPoint._2
    +        val nodeIndex = nodeIdCache(treeIndex)
    +        val nodeInfo = nodeIndexToInfo.getOrElse(nodeIndex, null)
    +        // We are processing this point only if it's in the nodeIndexToInfo map.
    --- End diff --
    
    Could the chunk of code below be removed into another function so that it is not duplicated in binSeqOp?  The function could sit outside of findBestSplits().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61212783
  
    Your test analysis is pretty convincing!  Keeping the PR sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60863634
  
    Another thought: This checkpointing logic seems like it will be useful for a bunch of algorithms in the future.  It would be nice to abstract it into some class which has standard parameters (checkpointInterval, etc.) and a method like:
    ```
    def checkpointIfNeeded(dataTransform: RDD -> RDD)
    ```
    which would handle persisting, checkpointing, unpersisting, and removing checkpoint files.  It can be a future PR though.  It would be useful for the GradientBoosting PR: [https://github.com/apache/spark/pull/2607]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19508975
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Node, Split}
    +
    +import org.apache.hadoop.fs.{Path, FileSystem}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param nodeIndex The current node index of a data point that this will update.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    nodeIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    --- End diff --
    
    Since you now know `split` available, can you replace `bins(featureIndex)(binIndex).highSplit.threshold` with `split.threshold` and then eliminate the `bins` argument?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19174472
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    --- End diff --
    
    Currently, this skips checkpointing if checkpointDir == None.  However, a user could set the SparkContext checkpointDir before calling DecisionTree.  Can the behavior be changed as follows:
    * If a checkpointDir is given here, then it should overwrite any preset checkpointDir in SparkContext.
    * If no checkpointDir is given, then the code should check the SparkContext (via cur.sparkContext.getCheckpointDir) to see if one has already been set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59884328
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21965/consoleFull) for   PR 2868 at commit [`6b05af0`](https://github.com/apache/spark/commit/6b05af042656b192e7b14954a433a75468df1d1c).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19300415
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    --- End diff --
    
    I agree it could get deep quickly, and switching indexing systems seems good for a future PR.  (W.r.t. training without pruning, I suspect it would be better and more efficient to use outright nearest neighbor since completely unpruned trees are basically doing nearest neighbor.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2868#discussion_r19247666
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/impl/NodeIdCache.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.tree.impl
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.tree.configuration.FeatureType._
    +import spire.implicits._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.tree.model.{Bin, Split}
    +
    +/**
    + * :: DeveloperApi ::
    + * This is used by the node id cache to find the child id that a data point would belong to.
    + * @param split Split information.
    + * @param leftChildIndex Left child index.
    + * @param rightChildIndex Right child index.
    + */
    +@DeveloperApi
    +private[tree] case class NodeIndexUpdater(
    +    split: Split,
    +    leftChildIndex: Int,
    +    rightChildIndex: Int) {
    +  /**
    +   * Determine a child node index based on the feature value and the split.
    +   * @param binnedFeatures Binned feature values.
    +   * @param bins Bin information to convert the bin indices to approximate feature values.
    +   * @return Child node index to update to.
    +   */
    +  def updateNodeIndex(binnedFeatures: Array[Int], bins: Array[Array[Bin]]): Int = {
    +    if (split.featureType == Continuous) {
    +      val featureIndex = split.feature
    +      val binIndex = binnedFeatures(featureIndex)
    +      val featureValueUpperBound = bins(featureIndex)(binIndex).highSplit.threshold
    +      if (featureValueUpperBound <= split.threshold) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    } else {
    +      if (split.categories.contains(binnedFeatures(split.feature).toDouble)) {
    +        leftChildIndex
    +      } else {
    +        rightChildIndex
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * A given TreePoint would belong to a particular node per tree.
    + * This is used to keep track of which node for a particular tree that a TreePoint belongs to.
    + * A separate RDD of Array[Int] needs to be maintained and updated at each iteration.
    + * @param data The RDD of training rows.
    + * @param cur The initial values in the cache
    + *            (should be an Array of all 1's (meaning the root nodes)).
    + * @param checkpointDir The checkpoint directory where
    + *                      the checkpointed files will be stored.
    + * @param checkpointInterval The checkpointing interval
    + *                           (how often should the cache be checkpointed.).
    + */
    +@DeveloperApi
    +private[tree] class NodeIdCache(
    +  val data: RDD[BaggedPoint[TreePoint]],
    +  var cur: RDD[Array[Int]],
    +  val checkpointDir: Option[String],
    +  val checkpointInterval: Int) {
    +
    +  // To keep track of the past checkpointed RDDs.
    +  val checkpointQueue = mutable.Queue[RDD[Array[Int]]]()
    +  var rddUpdateCount = 0
    +  if (checkpointDir != None) {
    +    cur.sparkContext.setCheckpointDir(checkpointDir.get)
    +  }
    +
    +  /**
    +   * Update the node index values in the cache.
    +   * This updates the RDD and its lineage.
    +   * TODO: Passing bin information to executors seems unnecessary and costly.
    +   * @param nodeIdUpdaters A map of node index updaters.
    +   *                       The key is the indices of nodes that we want to update.
    +   * @param bins Bin information needed to find child node indices.
    +   */
    +  def updateNodeIndices(
    +      nodeIdUpdaters: Array[Map[Int, NodeIndexUpdater]],
    +      bins: Array[Array[Bin]]): Unit = {
    +    val updatedRDD = data.zip(cur).map {
    +      dataPoint => {
    +        cfor(0)(_ < nodeIdUpdaters.length, _ + 1)(
    +          treeId => {
    +            val nodeIdUpdater = nodeIdUpdaters(treeId).getOrElse(dataPoint._2(treeId), null)
    +            if (nodeIdUpdater != null) {
    +              val newNodeIndex = nodeIdUpdater.updateNodeIndex(
    +                binnedFeatures = dataPoint._1.datum.binnedFeatures,
    +                bins = bins)
    +              dataPoint._2(treeId) = newNodeIndex
    +            }
    +          }
    +        )
    +
    +        dataPoint._2
    +      }
    +    }
    +
    +    cur = updatedRDD
    +    rddUpdateCount += 1
    +
    +    // Handle checkpointing if the directory is not None.
    +    if (checkpointDir != None && (rddUpdateCount % checkpointInterval) == 0) {
    +      // Let's see if we can unpersist previous checkpoints.
    +      var canUnpersist = true
    +      while (checkpointQueue.size > 1 && canUnpersist) {
    +        // We can unpersist the oldest checkpoint iff
    +        // the next checkpoint actually exists in the file system.
    +        if (checkpointQueue.get(1).get.getCheckpointFile != None) {
    +          val old = checkpointQueue.dequeue()
    +          old.unpersist()
    --- End diff --
    
    @mengxr Do you know of such plans?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-60711278
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22332/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-61335353
  
      [Test build #498 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/498/consoleFull) for   PR 2868 at commit [`a078fc8`](https://github.com/apache/spark/commit/a078fc867417c2100e6ea3dd5da19a94ae55fe36).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3161][MLLIB] Adding a node Id caching m...

Posted by codedeft <gi...@git.apache.org>.
Github user codedeft commented on the pull request:

    https://github.com/apache/spark/pull/2868#issuecomment-59879666
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org