You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhengruifeng (Jira)" <ji...@apache.org> on 2020/01/13 12:13:00 UTC

[jira] [Updated] (SPARK-30503) OnlineLDAOptimizer does not handle persistance correctly

     [ https://issues.apache.org/jira/browse/SPARK-30503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhengruifeng updated SPARK-30503:
---------------------------------
    Description: 
It seems that in {{OnlineLDAOptimizer, }}{{PeriodicGraphCheckpointer}} can not unpersit edges correctly.
{code:java}
scala> import org.apache.spark.ml.clustering.LDA
import org.apache.spark.ml.clustering.LDA

scala> val dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")
20/01/13 20:00:30 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan. dataset: org.apache.spark.sql.DataFrame = [label: double, features: vector]

scala> val lda = new LDA().setK(10).setMaxIter(100).setOptimizer("em")
lda: org.apache.spark.ml.clustering.LDA = lda_0e9a6cf09801

scala> sc.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> val model = lda.fit(dataset)
model: org.apache.spark.ml.clustering.LDAModel = DistributedLDAModel: uid=lda_0e9a6cf09801, k=10, numFeatures=11

scala> sc.getPersistentRDDs
res1: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(809 -> EdgeRDD MapPartitionsRDD[809] at mapPartitions at EdgeRDDImpl.scala:119, 1337 -> EdgeRDD MapPartitionsRDD[1337] at mapPartitions at EdgeRDDImpl.scala:119, 977 -> EdgeRDD MapPartitionsRDD[977] at mapPartitions at EdgeRDDImpl.scala:119, 1073 -> EdgeRDD MapPartitionsRDD[1073] at mapPartitions at EdgeRDDImpl.scala:119, 449 -> EdgeRDD MapPartitionsRDD[449] at mapPartitions at EdgeRDDImpl.scala:119, 1793 -> EdgeRDD MapPartitionsRDD[1793] at mapPartitions at EdgeRDDImpl.scala:119, 185 -> EdgeRDD MapPartitionsRDD[185] at mapPartitions at EdgeRDDImpl.scala:119, 1001 -> EdgeRDD MapPartitionsRDD[1001] at mapPartitions at EdgeRDDImpl.scala:119, 1601 -> EdgeRDD MapPartitionsRDD[1601] at mapPartitions a...

scala> sc.getPersistentRDDs.size
res2: Int = 106

scala> sc.getPersistentRDDs.foreach(println)
(809,EdgeRDD MapPartitionsRDD[809] at mapPartitions at EdgeRDDImpl.scala:119)
(1337,EdgeRDD MapPartitionsRDD[1337] at mapPartitions at EdgeRDDImpl.scala:119)
(977,EdgeRDD MapPartitionsRDD[977] at mapPartitions at EdgeRDDImpl.scala:119)
(1073,EdgeRDD MapPartitionsRDD[1073] at mapPartitions at EdgeRDDImpl.scala:119)
(449,EdgeRDD MapPartitionsRDD[449] at mapPartitions at EdgeRDDImpl.scala:119)
(1793,EdgeRDD MapPartitionsRDD[1793] at mapPartitions at EdgeRDDImpl.scala:119)
(185,EdgeRDD MapPartitionsRDD[185] at mapPartitions at EdgeRDDImpl.scala:119)
(1001,EdgeRDD MapPartitionsRDD[1001] at mapPartitions at EdgeRDDImpl.scala:119)
(1601,EdgeRDD MapPartitionsRDD[1601] at mapPartitions at EdgeRDDImpl.scala:119)
(1529,EdgeRDD MapPartitionsRDD[1529] at mapPartitions at EdgeRDDImpl.scala:119)
(1265,EdgeRDD MapPartitionsRDD[1265] at mapPartitions at EdgeRDDImpl.scala:119)
(257,EdgeRDD MapPartitionsRDD[257] at mapPartitions at EdgeRDDImpl.scala:119)
(1409,EdgeRDD MapPartitionsRDD[1409] at mapPartitions at EdgeRDDImpl.scala:119)
(1985,EdgeRDD MapPartitionsRDD[1985] at mapPartitions at EdgeRDDImpl.scala:119)
(785,EdgeRDD MapPartitionsRDD[785] at mapPartitions at EdgeRDDImpl.scala:119)
(1313,EdgeRDD MapPartitionsRDD[1313] at mapPartitions at EdgeRDDImpl.scala:119)
(1577,EdgeRDD MapPartitionsRDD[1577] at mapPartitions at EdgeRDDImpl.scala:119)
(881,EdgeRDD MapPartitionsRDD[881] at mapPartitions at EdgeRDDImpl.scala:119)
(29,VertexRDD, VertexRDD ZippedPartitionsRDD2[29] at zipPartitions at VertexRDD.scala:322)
(2105,EdgeRDD MapPartitionsRDD[2105] at mapPartitions at EdgeRDDImpl.scala:119)
(353,EdgeRDD MapPartitionsRDD[353] at mapPartitions at EdgeRDDImpl.scala:119)
(905,EdgeRDD MapPartitionsRDD[905] at mapPartitions at EdgeRDDImpl.scala:119)
(1169,EdgeRDD MapPartitionsRDD[1169] at mapPartitions at EdgeRDDImpl.scala:119)
(89,EdgeRDD MapPartitionsRDD[89] at mapPartitions at EdgeRDDImpl.scala:119)
(1433,EdgeRDD MapPartitionsRDD[1433] at mapPartitions at EdgeRDDImpl.scala:119)
(1697,EdgeRDD MapPartitionsRDD[1697] at mapPartitions at EdgeRDDImpl.scala:119)
(233,EdgeRDD MapPartitionsRDD[233] at mapPartitions at EdgeRDDImpl.scala:119)
(761,EdgeRDD MapPartitionsRDD[761] at mapPartitions at EdgeRDDImpl.scala:119)
(2441,EdgeRDD MapPartitionsRDD[2441] at mapPartitions at EdgeRDDImpl.scala:119)
(2249,EdgeRDD MapPartitionsRDD[2249] at mapPartitions at EdgeRDDImpl.scala:119)
(1217,EdgeRDD MapPartitionsRDD[1217] at mapPartitions at EdgeRDDImpl.scala:119)
(137,EdgeRDD MapPartitionsRDD[137] at mapPartitions at EdgeRDDImpl.scala:119)
(2414,VertexRDD, VertexRDD ZippedPartitionsRDD2[2414] at zipPartitions at VertexRDD.scala:322)
(65,EdgeRDD MapPartitionsRDD[65] at mapPartitions at EdgeRDDImpl.scala:119)
(329,EdgeRDD MapPartitionsRDD[329] at mapPartitions at EdgeRDDImpl.scala:119)
(665,EdgeRDD MapPartitionsRDD[665] at mapPartitions at EdgeRDDImpl.scala:119)
(1457,EdgeRDD MapPartitionsRDD[1457] at mapPartitions at EdgeRDDImpl.scala:119)
(2345,EdgeRDD MapPartitionsRDD[2345] at mapPartitions at EdgeRDDImpl.scala:119)
(1121,EdgeRDD MapPartitionsRDD[1121] at mapPartitions at EdgeRDDImpl.scala:119)
(593,EdgeRDD MapPartitionsRDD[593] at mapPartitions at EdgeRDDImpl.scala:119)
(857,EdgeRDD MapPartitionsRDD[857] at mapPartitions at EdgeRDDImpl.scala:119)
(1361,EdgeRDD MapPartitionsRDD[1361] at mapPartitions at EdgeRDDImpl.scala:119)
(1937,EdgeRDD MapPartitionsRDD[1937] at mapPartitions at EdgeRDDImpl.scala:119)
(1889,EdgeRDD MapPartitionsRDD[1889] at mapPartitions at EdgeRDDImpl.scala:119)
(2153,EdgeRDD MapPartitionsRDD[2153] at mapPartitions at EdgeRDDImpl.scala:119)
(569,EdgeRDD MapPartitionsRDD[569] at mapPartitions at EdgeRDDImpl.scala:119)
(1241,EdgeRDD MapPartitionsRDD[1241] at mapPartitions at EdgeRDDImpl.scala:119)
(2057,EdgeRDD MapPartitionsRDD[2057] at mapPartitions at EdgeRDDImpl.scala:119)
(953,EdgeRDD MapPartitionsRDD[953] at mapPartitions at EdgeRDDImpl.scala:119)
(425,EdgeRDD MapPartitionsRDD[425] at mapPartitions at EdgeRDDImpl.scala:119)
(2033,EdgeRDD MapPartitionsRDD[2033] at mapPartitions at EdgeRDDImpl.scala:119)
(32,EdgeRDD MapPartitionsRDD[32] at mapPartitions at EdgeRDDImpl.scala:119)
(161,EdgeRDD MapPartitionsRDD[161] at mapPartitions at EdgeRDDImpl.scala:119)
(689,EdgeRDD MapPartitionsRDD[689] at mapPartitions at EdgeRDDImpl.scala:119)
(2225,EdgeRDD MapPartitionsRDD[2225] at mapPartitions at EdgeRDDImpl.scala:119)
(2393,EdgeRDD MapPartitionsRDD[2393] at mapPartitions at EdgeRDDImpl.scala:119)
(281,EdgeRDD MapPartitionsRDD[281] at mapPartitions at EdgeRDDImpl.scala:119)
(545,EdgeRDD MapPartitionsRDD[545] at mapPartitions at EdgeRDDImpl.scala:119)
(641,EdgeRDD MapPartitionsRDD[641] at mapPartitions at EdgeRDDImpl.scala:119)
(713,EdgeRDD MapPartitionsRDD[713] at mapPartitions at EdgeRDDImpl.scala:119)
(1865,EdgeRDD MapPartitionsRDD[1865] at mapPartitions at EdgeRDDImpl.scala:119)
(113,EdgeRDD MapPartitionsRDD[113] at mapPartitions at EdgeRDDImpl.scala:119)
(377,EdgeRDD MapPartitionsRDD[377] at mapPartitions at EdgeRDDImpl.scala:119)
(737,EdgeRDD MapPartitionsRDD[737] at mapPartitions at EdgeRDDImpl.scala:119)
(2129,EdgeRDD MapPartitionsRDD[2129] at mapPartitions at EdgeRDDImpl.scala:119)
(521,EdgeRDD MapPartitionsRDD[521] at mapPartitions at EdgeRDDImpl.scala:119)
(1841,EdgeRDD MapPartitionsRDD[1841] at mapPartitions at EdgeRDDImpl.scala:119)
(2369,EdgeRDD MapPartitionsRDD[2369] at mapPartitions at EdgeRDDImpl.scala:119)
(2390,VertexRDD, VertexRDD ZippedPartitionsRDD2[2390] at zipPartitions at VertexRDD.scala:322)
(473,EdgeRDD MapPartitionsRDD[473] at mapPartitions at EdgeRDDImpl.scala:119)
(209,EdgeRDD MapPartitionsRDD[209] at mapPartitions at EdgeRDDImpl.scala:119)
(617,EdgeRDD MapPartitionsRDD[617] at mapPartitions at EdgeRDDImpl.scala:119)
(1145,EdgeRDD MapPartitionsRDD[1145] at mapPartitions at EdgeRDDImpl.scala:119)
(1049,EdgeRDD MapPartitionsRDD[1049] at mapPartitions at EdgeRDDImpl.scala:119)
(1961,EdgeRDD MapPartitionsRDD[1961] at mapPartitions at EdgeRDDImpl.scala:119)
(1025,EdgeRDD MapPartitionsRDD[1025] at mapPartitions at EdgeRDDImpl.scala:119)
(497,EdgeRDD MapPartitionsRDD[497] at mapPartitions at EdgeRDDImpl.scala:119)
(1649,EdgeRDD MapPartitionsRDD[1649] at mapPartitions at EdgeRDDImpl.scala:119)
(1553,EdgeRDD MapPartitionsRDD[1553] at mapPartitions at EdgeRDDImpl.scala:119)
(1817,EdgeRDD MapPartitionsRDD[1817] at mapPartitions at EdgeRDDImpl.scala:119)
(1913,EdgeRDD MapPartitionsRDD[1913] at mapPartitions at EdgeRDDImpl.scala:119)
(1289,EdgeRDD MapPartitionsRDD[1289] at mapPartitions at EdgeRDDImpl.scala:119)
(1385,EdgeRDD MapPartitionsRDD[1385] at mapPartitions at EdgeRDDImpl.scala:119)
(1721,EdgeRDD MapPartitionsRDD[1721] at mapPartitions at EdgeRDDImpl.scala:119)
(2273,EdgeRDD MapPartitionsRDD[2273] at mapPartitions at EdgeRDDImpl.scala:119)
(1481,EdgeRDD MapPartitionsRDD[1481] at mapPartitions at EdgeRDDImpl.scala:119)
(1745,EdgeRDD MapPartitionsRDD[1745] at mapPartitions at EdgeRDDImpl.scala:119)
(401,EdgeRDD MapPartitionsRDD[401] at mapPartitions at EdgeRDDImpl.scala:119)
(2009,EdgeRDD MapPartitionsRDD[2009] at mapPartitions at EdgeRDDImpl.scala:119)
(2081,EdgeRDD MapPartitionsRDD[2081] at mapPartitions at EdgeRDDImpl.scala:119)
(929,EdgeRDD MapPartitionsRDD[929] at mapPartitions at EdgeRDDImpl.scala:119)
(1193,EdgeRDD MapPartitionsRDD[1193] at mapPartitions at EdgeRDDImpl.scala:119)
(833,EdgeRDD MapPartitionsRDD[833] at mapPartitions at EdgeRDDImpl.scala:119)
(36,EdgeRDD MapPartitionsRDD[36] at mapPartitionsWithIndex at GraphImpl.scala:106)
(1097,EdgeRDD MapPartitionsRDD[1097] at mapPartitions at EdgeRDDImpl.scala:119)
(1625,EdgeRDD MapPartitionsRDD[1625] at mapPartitions at EdgeRDDImpl.scala:119)
(1673,EdgeRDD MapPartitionsRDD[1673] at mapPartitions at EdgeRDDImpl.scala:119)
(305,EdgeRDD MapPartitionsRDD[305] at mapPartitions at EdgeRDDImpl.scala:119)
(2201,EdgeRDD MapPartitionsRDD[2201] at mapPartitions at EdgeRDDImpl.scala:119)
(2417,EdgeRDD MapPartitionsRDD[2417] at mapPartitions at EdgeRDDImpl.scala:119)
(1505,EdgeRDD MapPartitionsRDD[1505] at mapPartitions at EdgeRDDImpl.scala:119)
(2321,EdgeRDD MapPartitionsRDD[2321] at mapPartitions at EdgeRDDImpl.scala:119)
(2438,VertexRDD, VertexRDD ZippedPartitionsRDD2[2438] at zipPartitions at VertexRDD.scala:322)
(2297,EdgeRDD MapPartitionsRDD[2297] at mapPartitions at EdgeRDDImpl.scala:119)
(1769,EdgeRDD MapPartitionsRDD[1769] at mapPartitions at EdgeRDDImpl.scala:119)
(2177,EdgeRDD MapPartitionsRDD[2177] at mapPartitions at EdgeRDDImpl.scala:119)
 {code}

  was:
It seems that in {{OnlineLDAOptimizer, }}{{PeriodicGraphCheckpointer}} can not unpersit edges correctly.
{code:java}
scala> import org.apache.spark.ml.clustering.LDA
import org.apache.spark.ml.clustering.LDA

scala> val dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")
20/01/13 20:00:30 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan. dataset: org.apache.spark.sql.DataFrame = [label: double, features: vector]

scala> val lda = new LDA().setK(10).setMaxIter(100).setOptimizer("em")
lda: org.apache.spark.ml.clustering.LDA = lda_0e9a6cf09801

scala> sc.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> val model = lda.fit(dataset)
model: org.apache.spark.ml.clustering.LDAModel = DistributedLDAModel: uid=lda_0e9a6cf09801, k=10, numFeatures=11

scala> sc.getPersistentRDDs
res1: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(809 -> EdgeRDD MapPartitionsRDD[809] at mapPartitions at EdgeRDDImpl.scala:119, 1337 -> EdgeRDD MapPartitionsRDD[1337] at mapPartitions at EdgeRDDImpl.scala:119, 977 -> EdgeRDD MapPartitionsRDD[977] at mapPartitions at EdgeRDDImpl.scala:119, 1073 -> EdgeRDD MapPartitionsRDD[1073] at mapPartitions at EdgeRDDImpl.scala:119, 449 -> EdgeRDD MapPartitionsRDD[449] at mapPartitions at EdgeRDDImpl.scala:119, 1793 -> EdgeRDD MapPartitionsRDD[1793] at mapPartitions at EdgeRDDImpl.scala:119, 185 -> EdgeRDD MapPartitionsRDD[185] at mapPartitions at EdgeRDDImpl.scala:119, 1001 -> EdgeRDD MapPartitionsRDD[1001] at mapPartitions at EdgeRDDImpl.scala:119, 1601 -> EdgeRDD MapPartitionsRDD[1601] at mapPartitions a...scala> sc.getPersistentRDDs.size
res2: Int = 106scala> sc.getPersistentRDDs.foreach(println)
(809,EdgeRDD MapPartitionsRDD[809] at mapPartitions at EdgeRDDImpl.scala:119)
(1337,EdgeRDD MapPartitionsRDD[1337] at mapPartitions at EdgeRDDImpl.scala:119)
(977,EdgeRDD MapPartitionsRDD[977] at mapPartitions at EdgeRDDImpl.scala:119)
(1073,EdgeRDD MapPartitionsRDD[1073] at mapPartitions at EdgeRDDImpl.scala:119)
(449,EdgeRDD MapPartitionsRDD[449] at mapPartitions at EdgeRDDImpl.scala:119)
(1793,EdgeRDD MapPartitionsRDD[1793] at mapPartitions at EdgeRDDImpl.scala:119)
(185,EdgeRDD MapPartitionsRDD[185] at mapPartitions at EdgeRDDImpl.scala:119)
(1001,EdgeRDD MapPartitionsRDD[1001] at mapPartitions at EdgeRDDImpl.scala:119)
(1601,EdgeRDD MapPartitionsRDD[1601] at mapPartitions at EdgeRDDImpl.scala:119)
(1529,EdgeRDD MapPartitionsRDD[1529] at mapPartitions at EdgeRDDImpl.scala:119)
(1265,EdgeRDD MapPartitionsRDD[1265] at mapPartitions at EdgeRDDImpl.scala:119)
(257,EdgeRDD MapPartitionsRDD[257] at mapPartitions at EdgeRDDImpl.scala:119)
(1409,EdgeRDD MapPartitionsRDD[1409] at mapPartitions at EdgeRDDImpl.scala:119)
(1985,EdgeRDD MapPartitionsRDD[1985] at mapPartitions at EdgeRDDImpl.scala:119)
(785,EdgeRDD MapPartitionsRDD[785] at mapPartitions at EdgeRDDImpl.scala:119)
(1313,EdgeRDD MapPartitionsRDD[1313] at mapPartitions at EdgeRDDImpl.scala:119)
(1577,EdgeRDD MapPartitionsRDD[1577] at mapPartitions at EdgeRDDImpl.scala:119)
(881,EdgeRDD MapPartitionsRDD[881] at mapPartitions at EdgeRDDImpl.scala:119)
(29,VertexRDD, VertexRDD ZippedPartitionsRDD2[29] at zipPartitions at VertexRDD.scala:322)
(2105,EdgeRDD MapPartitionsRDD[2105] at mapPartitions at EdgeRDDImpl.scala:119)
(353,EdgeRDD MapPartitionsRDD[353] at mapPartitions at EdgeRDDImpl.scala:119)
(905,EdgeRDD MapPartitionsRDD[905] at mapPartitions at EdgeRDDImpl.scala:119)
(1169,EdgeRDD MapPartitionsRDD[1169] at mapPartitions at EdgeRDDImpl.scala:119)
(89,EdgeRDD MapPartitionsRDD[89] at mapPartitions at EdgeRDDImpl.scala:119)
(1433,EdgeRDD MapPartitionsRDD[1433] at mapPartitions at EdgeRDDImpl.scala:119)
(1697,EdgeRDD MapPartitionsRDD[1697] at mapPartitions at EdgeRDDImpl.scala:119)
(233,EdgeRDD MapPartitionsRDD[233] at mapPartitions at EdgeRDDImpl.scala:119)
(761,EdgeRDD MapPartitionsRDD[761] at mapPartitions at EdgeRDDImpl.scala:119)
(2441,EdgeRDD MapPartitionsRDD[2441] at mapPartitions at EdgeRDDImpl.scala:119)
(2249,EdgeRDD MapPartitionsRDD[2249] at mapPartitions at EdgeRDDImpl.scala:119)
(1217,EdgeRDD MapPartitionsRDD[1217] at mapPartitions at EdgeRDDImpl.scala:119)
(137,EdgeRDD MapPartitionsRDD[137] at mapPartitions at EdgeRDDImpl.scala:119)
(2414,VertexRDD, VertexRDD ZippedPartitionsRDD2[2414] at zipPartitions at VertexRDD.scala:322)
(65,EdgeRDD MapPartitionsRDD[65] at mapPartitions at EdgeRDDImpl.scala:119)
(329,EdgeRDD MapPartitionsRDD[329] at mapPartitions at EdgeRDDImpl.scala:119)
(665,EdgeRDD MapPartitionsRDD[665] at mapPartitions at EdgeRDDImpl.scala:119)
(1457,EdgeRDD MapPartitionsRDD[1457] at mapPartitions at EdgeRDDImpl.scala:119)
(2345,EdgeRDD MapPartitionsRDD[2345] at mapPartitions at EdgeRDDImpl.scala:119)
(1121,EdgeRDD MapPartitionsRDD[1121] at mapPartitions at EdgeRDDImpl.scala:119)
(593,EdgeRDD MapPartitionsRDD[593] at mapPartitions at EdgeRDDImpl.scala:119)
(857,EdgeRDD MapPartitionsRDD[857] at mapPartitions at EdgeRDDImpl.scala:119)
(1361,EdgeRDD MapPartitionsRDD[1361] at mapPartitions at EdgeRDDImpl.scala:119)
(1937,EdgeRDD MapPartitionsRDD[1937] at mapPartitions at EdgeRDDImpl.scala:119)
(1889,EdgeRDD MapPartitionsRDD[1889] at mapPartitions at EdgeRDDImpl.scala:119)
(2153,EdgeRDD MapPartitionsRDD[2153] at mapPartitions at EdgeRDDImpl.scala:119)
(569,EdgeRDD MapPartitionsRDD[569] at mapPartitions at EdgeRDDImpl.scala:119)
(1241,EdgeRDD MapPartitionsRDD[1241] at mapPartitions at EdgeRDDImpl.scala:119)
(2057,EdgeRDD MapPartitionsRDD[2057] at mapPartitions at EdgeRDDImpl.scala:119)
(953,EdgeRDD MapPartitionsRDD[953] at mapPartitions at EdgeRDDImpl.scala:119)
(425,EdgeRDD MapPartitionsRDD[425] at mapPartitions at EdgeRDDImpl.scala:119)
(2033,EdgeRDD MapPartitionsRDD[2033] at mapPartitions at EdgeRDDImpl.scala:119)
(32,EdgeRDD MapPartitionsRDD[32] at mapPartitions at EdgeRDDImpl.scala:119)
(161,EdgeRDD MapPartitionsRDD[161] at mapPartitions at EdgeRDDImpl.scala:119)
(689,EdgeRDD MapPartitionsRDD[689] at mapPartitions at EdgeRDDImpl.scala:119)
(2225,EdgeRDD MapPartitionsRDD[2225] at mapPartitions at EdgeRDDImpl.scala:119)
(2393,EdgeRDD MapPartitionsRDD[2393] at mapPartitions at EdgeRDDImpl.scala:119)
(281,EdgeRDD MapPartitionsRDD[281] at mapPartitions at EdgeRDDImpl.scala:119)
(545,EdgeRDD MapPartitionsRDD[545] at mapPartitions at EdgeRDDImpl.scala:119)
(641,EdgeRDD MapPartitionsRDD[641] at mapPartitions at EdgeRDDImpl.scala:119)
(713,EdgeRDD MapPartitionsRDD[713] at mapPartitions at EdgeRDDImpl.scala:119)
(1865,EdgeRDD MapPartitionsRDD[1865] at mapPartitions at EdgeRDDImpl.scala:119)
(113,EdgeRDD MapPartitionsRDD[113] at mapPartitions at EdgeRDDImpl.scala:119)
(377,EdgeRDD MapPartitionsRDD[377] at mapPartitions at EdgeRDDImpl.scala:119)
(737,EdgeRDD MapPartitionsRDD[737] at mapPartitions at EdgeRDDImpl.scala:119)
(2129,EdgeRDD MapPartitionsRDD[2129] at mapPartitions at EdgeRDDImpl.scala:119)
(521,EdgeRDD MapPartitionsRDD[521] at mapPartitions at EdgeRDDImpl.scala:119)
(1841,EdgeRDD MapPartitionsRDD[1841] at mapPartitions at EdgeRDDImpl.scala:119)
(2369,EdgeRDD MapPartitionsRDD[2369] at mapPartitions at EdgeRDDImpl.scala:119)
(2390,VertexRDD, VertexRDD ZippedPartitionsRDD2[2390] at zipPartitions at VertexRDD.scala:322)
(473,EdgeRDD MapPartitionsRDD[473] at mapPartitions at EdgeRDDImpl.scala:119)
(209,EdgeRDD MapPartitionsRDD[209] at mapPartitions at EdgeRDDImpl.scala:119)
(617,EdgeRDD MapPartitionsRDD[617] at mapPartitions at EdgeRDDImpl.scala:119)
(1145,EdgeRDD MapPartitionsRDD[1145] at mapPartitions at EdgeRDDImpl.scala:119)
(1049,EdgeRDD MapPartitionsRDD[1049] at mapPartitions at EdgeRDDImpl.scala:119)
(1961,EdgeRDD MapPartitionsRDD[1961] at mapPartitions at EdgeRDDImpl.scala:119)
(1025,EdgeRDD MapPartitionsRDD[1025] at mapPartitions at EdgeRDDImpl.scala:119)
(497,EdgeRDD MapPartitionsRDD[497] at mapPartitions at EdgeRDDImpl.scala:119)
(1649,EdgeRDD MapPartitionsRDD[1649] at mapPartitions at EdgeRDDImpl.scala:119)
(1553,EdgeRDD MapPartitionsRDD[1553] at mapPartitions at EdgeRDDImpl.scala:119)
(1817,EdgeRDD MapPartitionsRDD[1817] at mapPartitions at EdgeRDDImpl.scala:119)
(1913,EdgeRDD MapPartitionsRDD[1913] at mapPartitions at EdgeRDDImpl.scala:119)
(1289,EdgeRDD MapPartitionsRDD[1289] at mapPartitions at EdgeRDDImpl.scala:119)
(1385,EdgeRDD MapPartitionsRDD[1385] at mapPartitions at EdgeRDDImpl.scala:119)
(1721,EdgeRDD MapPartitionsRDD[1721] at mapPartitions at EdgeRDDImpl.scala:119)
(2273,EdgeRDD MapPartitionsRDD[2273] at mapPartitions at EdgeRDDImpl.scala:119)
(1481,EdgeRDD MapPartitionsRDD[1481] at mapPartitions at EdgeRDDImpl.scala:119)
(1745,EdgeRDD MapPartitionsRDD[1745] at mapPartitions at EdgeRDDImpl.scala:119)
(401,EdgeRDD MapPartitionsRDD[401] at mapPartitions at EdgeRDDImpl.scala:119)
(2009,EdgeRDD MapPartitionsRDD[2009] at mapPartitions at EdgeRDDImpl.scala:119)
(2081,EdgeRDD MapPartitionsRDD[2081] at mapPartitions at EdgeRDDImpl.scala:119)
(929,EdgeRDD MapPartitionsRDD[929] at mapPartitions at EdgeRDDImpl.scala:119)
(1193,EdgeRDD MapPartitionsRDD[1193] at mapPartitions at EdgeRDDImpl.scala:119)
(833,EdgeRDD MapPartitionsRDD[833] at mapPartitions at EdgeRDDImpl.scala:119)
(36,EdgeRDD MapPartitionsRDD[36] at mapPartitionsWithIndex at GraphImpl.scala:106)
(1097,EdgeRDD MapPartitionsRDD[1097] at mapPartitions at EdgeRDDImpl.scala:119)
(1625,EdgeRDD MapPartitionsRDD[1625] at mapPartitions at EdgeRDDImpl.scala:119)
(1673,EdgeRDD MapPartitionsRDD[1673] at mapPartitions at EdgeRDDImpl.scala:119)
(305,EdgeRDD MapPartitionsRDD[305] at mapPartitions at EdgeRDDImpl.scala:119)
(2201,EdgeRDD MapPartitionsRDD[2201] at mapPartitions at EdgeRDDImpl.scala:119)
(2417,EdgeRDD MapPartitionsRDD[2417] at mapPartitions at EdgeRDDImpl.scala:119)
(1505,EdgeRDD MapPartitionsRDD[1505] at mapPartitions at EdgeRDDImpl.scala:119)
(2321,EdgeRDD MapPartitionsRDD[2321] at mapPartitions at EdgeRDDImpl.scala:119)
(2438,VertexRDD, VertexRDD ZippedPartitionsRDD2[2438] at zipPartitions at VertexRDD.scala:322)
(2297,EdgeRDD MapPartitionsRDD[2297] at mapPartitions at EdgeRDDImpl.scala:119)
(1769,EdgeRDD MapPartitionsRDD[1769] at mapPartitions at EdgeRDDImpl.scala:119)
(2177,EdgeRDD MapPartitionsRDD[2177] at mapPartitions at EdgeRDDImpl.scala:119)
 {code}


> OnlineLDAOptimizer does not handle persistance correctly
> --------------------------------------------------------
>
>                 Key: SPARK-30503
>                 URL: https://issues.apache.org/jira/browse/SPARK-30503
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX, ML
>    Affects Versions: 3.0.0
>            Reporter: zhengruifeng
>            Priority: Major
>
> It seems that in {{OnlineLDAOptimizer, }}{{PeriodicGraphCheckpointer}} can not unpersit edges correctly.
> {code:java}
> scala> import org.apache.spark.ml.clustering.LDA
> import org.apache.spark.ml.clustering.LDA
> scala> val dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")
> 20/01/13 20:00:30 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan. dataset: org.apache.spark.sql.DataFrame = [label: double, features: vector]
> scala> val lda = new LDA().setK(10).setMaxIter(100).setOptimizer("em")
> lda: org.apache.spark.ml.clustering.LDA = lda_0e9a6cf09801
> scala> sc.getPersistentRDDs
> res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
> scala> val model = lda.fit(dataset)
> model: org.apache.spark.ml.clustering.LDAModel = DistributedLDAModel: uid=lda_0e9a6cf09801, k=10, numFeatures=11
> scala> sc.getPersistentRDDs
> res1: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(809 -> EdgeRDD MapPartitionsRDD[809] at mapPartitions at EdgeRDDImpl.scala:119, 1337 -> EdgeRDD MapPartitionsRDD[1337] at mapPartitions at EdgeRDDImpl.scala:119, 977 -> EdgeRDD MapPartitionsRDD[977] at mapPartitions at EdgeRDDImpl.scala:119, 1073 -> EdgeRDD MapPartitionsRDD[1073] at mapPartitions at EdgeRDDImpl.scala:119, 449 -> EdgeRDD MapPartitionsRDD[449] at mapPartitions at EdgeRDDImpl.scala:119, 1793 -> EdgeRDD MapPartitionsRDD[1793] at mapPartitions at EdgeRDDImpl.scala:119, 185 -> EdgeRDD MapPartitionsRDD[185] at mapPartitions at EdgeRDDImpl.scala:119, 1001 -> EdgeRDD MapPartitionsRDD[1001] at mapPartitions at EdgeRDDImpl.scala:119, 1601 -> EdgeRDD MapPartitionsRDD[1601] at mapPartitions a...
> scala> sc.getPersistentRDDs.size
> res2: Int = 106
> scala> sc.getPersistentRDDs.foreach(println)
> (809,EdgeRDD MapPartitionsRDD[809] at mapPartitions at EdgeRDDImpl.scala:119)
> (1337,EdgeRDD MapPartitionsRDD[1337] at mapPartitions at EdgeRDDImpl.scala:119)
> (977,EdgeRDD MapPartitionsRDD[977] at mapPartitions at EdgeRDDImpl.scala:119)
> (1073,EdgeRDD MapPartitionsRDD[1073] at mapPartitions at EdgeRDDImpl.scala:119)
> (449,EdgeRDD MapPartitionsRDD[449] at mapPartitions at EdgeRDDImpl.scala:119)
> (1793,EdgeRDD MapPartitionsRDD[1793] at mapPartitions at EdgeRDDImpl.scala:119)
> (185,EdgeRDD MapPartitionsRDD[185] at mapPartitions at EdgeRDDImpl.scala:119)
> (1001,EdgeRDD MapPartitionsRDD[1001] at mapPartitions at EdgeRDDImpl.scala:119)
> (1601,EdgeRDD MapPartitionsRDD[1601] at mapPartitions at EdgeRDDImpl.scala:119)
> (1529,EdgeRDD MapPartitionsRDD[1529] at mapPartitions at EdgeRDDImpl.scala:119)
> (1265,EdgeRDD MapPartitionsRDD[1265] at mapPartitions at EdgeRDDImpl.scala:119)
> (257,EdgeRDD MapPartitionsRDD[257] at mapPartitions at EdgeRDDImpl.scala:119)
> (1409,EdgeRDD MapPartitionsRDD[1409] at mapPartitions at EdgeRDDImpl.scala:119)
> (1985,EdgeRDD MapPartitionsRDD[1985] at mapPartitions at EdgeRDDImpl.scala:119)
> (785,EdgeRDD MapPartitionsRDD[785] at mapPartitions at EdgeRDDImpl.scala:119)
> (1313,EdgeRDD MapPartitionsRDD[1313] at mapPartitions at EdgeRDDImpl.scala:119)
> (1577,EdgeRDD MapPartitionsRDD[1577] at mapPartitions at EdgeRDDImpl.scala:119)
> (881,EdgeRDD MapPartitionsRDD[881] at mapPartitions at EdgeRDDImpl.scala:119)
> (29,VertexRDD, VertexRDD ZippedPartitionsRDD2[29] at zipPartitions at VertexRDD.scala:322)
> (2105,EdgeRDD MapPartitionsRDD[2105] at mapPartitions at EdgeRDDImpl.scala:119)
> (353,EdgeRDD MapPartitionsRDD[353] at mapPartitions at EdgeRDDImpl.scala:119)
> (905,EdgeRDD MapPartitionsRDD[905] at mapPartitions at EdgeRDDImpl.scala:119)
> (1169,EdgeRDD MapPartitionsRDD[1169] at mapPartitions at EdgeRDDImpl.scala:119)
> (89,EdgeRDD MapPartitionsRDD[89] at mapPartitions at EdgeRDDImpl.scala:119)
> (1433,EdgeRDD MapPartitionsRDD[1433] at mapPartitions at EdgeRDDImpl.scala:119)
> (1697,EdgeRDD MapPartitionsRDD[1697] at mapPartitions at EdgeRDDImpl.scala:119)
> (233,EdgeRDD MapPartitionsRDD[233] at mapPartitions at EdgeRDDImpl.scala:119)
> (761,EdgeRDD MapPartitionsRDD[761] at mapPartitions at EdgeRDDImpl.scala:119)
> (2441,EdgeRDD MapPartitionsRDD[2441] at mapPartitions at EdgeRDDImpl.scala:119)
> (2249,EdgeRDD MapPartitionsRDD[2249] at mapPartitions at EdgeRDDImpl.scala:119)
> (1217,EdgeRDD MapPartitionsRDD[1217] at mapPartitions at EdgeRDDImpl.scala:119)
> (137,EdgeRDD MapPartitionsRDD[137] at mapPartitions at EdgeRDDImpl.scala:119)
> (2414,VertexRDD, VertexRDD ZippedPartitionsRDD2[2414] at zipPartitions at VertexRDD.scala:322)
> (65,EdgeRDD MapPartitionsRDD[65] at mapPartitions at EdgeRDDImpl.scala:119)
> (329,EdgeRDD MapPartitionsRDD[329] at mapPartitions at EdgeRDDImpl.scala:119)
> (665,EdgeRDD MapPartitionsRDD[665] at mapPartitions at EdgeRDDImpl.scala:119)
> (1457,EdgeRDD MapPartitionsRDD[1457] at mapPartitions at EdgeRDDImpl.scala:119)
> (2345,EdgeRDD MapPartitionsRDD[2345] at mapPartitions at EdgeRDDImpl.scala:119)
> (1121,EdgeRDD MapPartitionsRDD[1121] at mapPartitions at EdgeRDDImpl.scala:119)
> (593,EdgeRDD MapPartitionsRDD[593] at mapPartitions at EdgeRDDImpl.scala:119)
> (857,EdgeRDD MapPartitionsRDD[857] at mapPartitions at EdgeRDDImpl.scala:119)
> (1361,EdgeRDD MapPartitionsRDD[1361] at mapPartitions at EdgeRDDImpl.scala:119)
> (1937,EdgeRDD MapPartitionsRDD[1937] at mapPartitions at EdgeRDDImpl.scala:119)
> (1889,EdgeRDD MapPartitionsRDD[1889] at mapPartitions at EdgeRDDImpl.scala:119)
> (2153,EdgeRDD MapPartitionsRDD[2153] at mapPartitions at EdgeRDDImpl.scala:119)
> (569,EdgeRDD MapPartitionsRDD[569] at mapPartitions at EdgeRDDImpl.scala:119)
> (1241,EdgeRDD MapPartitionsRDD[1241] at mapPartitions at EdgeRDDImpl.scala:119)
> (2057,EdgeRDD MapPartitionsRDD[2057] at mapPartitions at EdgeRDDImpl.scala:119)
> (953,EdgeRDD MapPartitionsRDD[953] at mapPartitions at EdgeRDDImpl.scala:119)
> (425,EdgeRDD MapPartitionsRDD[425] at mapPartitions at EdgeRDDImpl.scala:119)
> (2033,EdgeRDD MapPartitionsRDD[2033] at mapPartitions at EdgeRDDImpl.scala:119)
> (32,EdgeRDD MapPartitionsRDD[32] at mapPartitions at EdgeRDDImpl.scala:119)
> (161,EdgeRDD MapPartitionsRDD[161] at mapPartitions at EdgeRDDImpl.scala:119)
> (689,EdgeRDD MapPartitionsRDD[689] at mapPartitions at EdgeRDDImpl.scala:119)
> (2225,EdgeRDD MapPartitionsRDD[2225] at mapPartitions at EdgeRDDImpl.scala:119)
> (2393,EdgeRDD MapPartitionsRDD[2393] at mapPartitions at EdgeRDDImpl.scala:119)
> (281,EdgeRDD MapPartitionsRDD[281] at mapPartitions at EdgeRDDImpl.scala:119)
> (545,EdgeRDD MapPartitionsRDD[545] at mapPartitions at EdgeRDDImpl.scala:119)
> (641,EdgeRDD MapPartitionsRDD[641] at mapPartitions at EdgeRDDImpl.scala:119)
> (713,EdgeRDD MapPartitionsRDD[713] at mapPartitions at EdgeRDDImpl.scala:119)
> (1865,EdgeRDD MapPartitionsRDD[1865] at mapPartitions at EdgeRDDImpl.scala:119)
> (113,EdgeRDD MapPartitionsRDD[113] at mapPartitions at EdgeRDDImpl.scala:119)
> (377,EdgeRDD MapPartitionsRDD[377] at mapPartitions at EdgeRDDImpl.scala:119)
> (737,EdgeRDD MapPartitionsRDD[737] at mapPartitions at EdgeRDDImpl.scala:119)
> (2129,EdgeRDD MapPartitionsRDD[2129] at mapPartitions at EdgeRDDImpl.scala:119)
> (521,EdgeRDD MapPartitionsRDD[521] at mapPartitions at EdgeRDDImpl.scala:119)
> (1841,EdgeRDD MapPartitionsRDD[1841] at mapPartitions at EdgeRDDImpl.scala:119)
> (2369,EdgeRDD MapPartitionsRDD[2369] at mapPartitions at EdgeRDDImpl.scala:119)
> (2390,VertexRDD, VertexRDD ZippedPartitionsRDD2[2390] at zipPartitions at VertexRDD.scala:322)
> (473,EdgeRDD MapPartitionsRDD[473] at mapPartitions at EdgeRDDImpl.scala:119)
> (209,EdgeRDD MapPartitionsRDD[209] at mapPartitions at EdgeRDDImpl.scala:119)
> (617,EdgeRDD MapPartitionsRDD[617] at mapPartitions at EdgeRDDImpl.scala:119)
> (1145,EdgeRDD MapPartitionsRDD[1145] at mapPartitions at EdgeRDDImpl.scala:119)
> (1049,EdgeRDD MapPartitionsRDD[1049] at mapPartitions at EdgeRDDImpl.scala:119)
> (1961,EdgeRDD MapPartitionsRDD[1961] at mapPartitions at EdgeRDDImpl.scala:119)
> (1025,EdgeRDD MapPartitionsRDD[1025] at mapPartitions at EdgeRDDImpl.scala:119)
> (497,EdgeRDD MapPartitionsRDD[497] at mapPartitions at EdgeRDDImpl.scala:119)
> (1649,EdgeRDD MapPartitionsRDD[1649] at mapPartitions at EdgeRDDImpl.scala:119)
> (1553,EdgeRDD MapPartitionsRDD[1553] at mapPartitions at EdgeRDDImpl.scala:119)
> (1817,EdgeRDD MapPartitionsRDD[1817] at mapPartitions at EdgeRDDImpl.scala:119)
> (1913,EdgeRDD MapPartitionsRDD[1913] at mapPartitions at EdgeRDDImpl.scala:119)
> (1289,EdgeRDD MapPartitionsRDD[1289] at mapPartitions at EdgeRDDImpl.scala:119)
> (1385,EdgeRDD MapPartitionsRDD[1385] at mapPartitions at EdgeRDDImpl.scala:119)
> (1721,EdgeRDD MapPartitionsRDD[1721] at mapPartitions at EdgeRDDImpl.scala:119)
> (2273,EdgeRDD MapPartitionsRDD[2273] at mapPartitions at EdgeRDDImpl.scala:119)
> (1481,EdgeRDD MapPartitionsRDD[1481] at mapPartitions at EdgeRDDImpl.scala:119)
> (1745,EdgeRDD MapPartitionsRDD[1745] at mapPartitions at EdgeRDDImpl.scala:119)
> (401,EdgeRDD MapPartitionsRDD[401] at mapPartitions at EdgeRDDImpl.scala:119)
> (2009,EdgeRDD MapPartitionsRDD[2009] at mapPartitions at EdgeRDDImpl.scala:119)
> (2081,EdgeRDD MapPartitionsRDD[2081] at mapPartitions at EdgeRDDImpl.scala:119)
> (929,EdgeRDD MapPartitionsRDD[929] at mapPartitions at EdgeRDDImpl.scala:119)
> (1193,EdgeRDD MapPartitionsRDD[1193] at mapPartitions at EdgeRDDImpl.scala:119)
> (833,EdgeRDD MapPartitionsRDD[833] at mapPartitions at EdgeRDDImpl.scala:119)
> (36,EdgeRDD MapPartitionsRDD[36] at mapPartitionsWithIndex at GraphImpl.scala:106)
> (1097,EdgeRDD MapPartitionsRDD[1097] at mapPartitions at EdgeRDDImpl.scala:119)
> (1625,EdgeRDD MapPartitionsRDD[1625] at mapPartitions at EdgeRDDImpl.scala:119)
> (1673,EdgeRDD MapPartitionsRDD[1673] at mapPartitions at EdgeRDDImpl.scala:119)
> (305,EdgeRDD MapPartitionsRDD[305] at mapPartitions at EdgeRDDImpl.scala:119)
> (2201,EdgeRDD MapPartitionsRDD[2201] at mapPartitions at EdgeRDDImpl.scala:119)
> (2417,EdgeRDD MapPartitionsRDD[2417] at mapPartitions at EdgeRDDImpl.scala:119)
> (1505,EdgeRDD MapPartitionsRDD[1505] at mapPartitions at EdgeRDDImpl.scala:119)
> (2321,EdgeRDD MapPartitionsRDD[2321] at mapPartitions at EdgeRDDImpl.scala:119)
> (2438,VertexRDD, VertexRDD ZippedPartitionsRDD2[2438] at zipPartitions at VertexRDD.scala:322)
> (2297,EdgeRDD MapPartitionsRDD[2297] at mapPartitions at EdgeRDDImpl.scala:119)
> (1769,EdgeRDD MapPartitionsRDD[1769] at mapPartitions at EdgeRDDImpl.scala:119)
> (2177,EdgeRDD MapPartitionsRDD[2177] at mapPartitions at EdgeRDDImpl.scala:119)
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org