You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by em...@apache.org on 2017/01/27 16:59:23 UTC

incubator-predictionio git commit: [PIO-45] Fix compression order in SelfCleaningDataSource. Thanks @jimlyndon

Repository: incubator-predictionio
Updated Branches:
  refs/heads/develop 8798547af -> 1a222657b


[PIO-45] Fix compression order in SelfCleaningDataSource. Thanks @jimlyndon


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/1a222657
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/1a222657
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/1a222657

Branch: refs/heads/develop
Commit: 1a222657b8438b56b2ef04eb14af5a3cf441bad0
Parents: 8798547
Author: EmergentOrder <le...@gmail.com>
Authored: Fri Jan 27 10:56:23 2017 -0600
Committer: EmergentOrder <le...@gmail.com>
Committed: Fri Jan 27 10:56:23 2017 -0600

----------------------------------------------------------------------
 .../apache/predictionio/core/SelfCleaningDataSource.scala   | 9 +++++----
 .../predictionio/core/SelfCleaningDataSourceTest.scala      | 6 +++++-
 core/src/test/scala/org/apache/predictionio/core/test.json  | 1 +
 3 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/1a222657/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
index 9a2e68a..dc80566 100644
--- a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
+++ b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala
@@ -229,7 +229,7 @@ trait SelfCleaningDataSource {
     */
   @DeveloperApi
   def cleanPEvents(sc: SparkContext): RDD[Event] = {
-    val pEvents = PEventStore.find(appName)(sc).sortBy(_.eventTime)
+    val pEvents = PEventStore.find(appName)(sc).sortBy(_.eventTime, false)
 
     val rdd = eventWindow match {
       case Some(ew) =>
@@ -274,7 +274,7 @@ trait SelfCleaningDataSource {
     */
   @DeveloperApi
   def cleanLEvents(): Iterable[Event] = {
-    val lEvents = LEventStore.find(appName).toList.sortBy(_.eventTime)
+    val lEvents = LEventStore.find(appName).toList.sortBy(_.eventTime).reverse
 
     val events = eventWindow match {
       case Some(ew) =>
@@ -305,13 +305,14 @@ trait SelfCleaningDataSource {
               e1.properties.fields
                 .filterKeys(f => !e2.properties.fields.contains(f))
           }
-          e1.copy(properties = DataMap(props))
+          e1.copy(properties = DataMap(props), eventTime = e2.eventTime)
         }
 
       case None =>
         events.reduce { (e1, e2) =>
           e1.copy(properties =
-            DataMap(e1.properties.fields ++ e2.properties.fields)
+            DataMap(e1.properties.fields ++ e2.properties.fields),
+            eventTime = e2.eventTime
           )
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/1a222657/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
index e1ed832..1aed1ba 100644
--- a/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
+++ b/core/src/test/scala/org/apache/predictionio/core/SelfCleaningDataSourceTest.scala
@@ -76,7 +76,11 @@ class SelfCleaningDataSourceTest extends FunSuite with Inside with SharedSparkCo
     val itemEventsAfterCount = source.itemEvents(sc).count   
     val distinctEventsAfterCount = eventsAfter.map(x => 
       CleanedDataSourceTest.stripIdAndCreationTimeFromEvents(x)).distinct.count
-   
+
+    val nexusSet = eventsAfter.filter(x => x.event == "$set" && x.entityId == "Nexus").take(1)(0) 
+
+    nexusSet.properties.get[String]("available") should equal ("2016-03-18T13:31:49.016770+00:00")
+ 
     distinctEventsAfterCount should equal (eventsAfterCount)
     eventsBeforeCount should be > (eventsAfterCount) 
     itemEventsBeforeCount should be > (itemEventsAfterCount)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/1a222657/core/src/test/scala/org/apache/predictionio/core/test.json
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/predictionio/core/test.json b/core/src/test/scala/org/apache/predictionio/core/test.json
index f38734f..7732d43 100644
--- a/core/src/test/scala/org/apache/predictionio/core/test.json
+++ b/core/src/test/scala/org/apache/predictionio/core/test.json
@@ -3,6 +3,7 @@
 {"event":"$set","entityType":"item","entityId":"Nexus","properties":{"categories":["Tablets","Electronics","Google2"], "test": ["testA", "testB"]},"eventTime":"2006-03-17T15:54:49.941Z","creationTime":"2006-03-17T15:54:49.945Z"}
 {"eventId":"KpjNMVrQzY2s0TZhYB3vsAAAAVOFSkNogMMiTarDxQA","event":"$set","entityType":"item","entityId":"Nexus","properties":{"countries":["United States","Canada"]},"eventTime":"2016-03-17T15:55:49.992Z","creationTime":"2016-03-17T15:55:49.997Z"}
 {"eventId":"KpjNMVrQzY2s0TZhYB3vsAAAAVOFSkOdrr3SJaHTlQQ","event":"$set","entityType":"item","entityId":"Nexus","properties":{"available":"2016-03-14T13:31:49.016770+00:00","date":"2016-03-16T13:31:49.016770+00:00","expires":"2016-03-18T13:31:49.016770+00:00"},"eventTime":"2016-03-17T15:55:50.045Z","creationTime":"2016-03-17T15:55:50.049Z"}
+{"eventId":"KpjNMVrQzY2s0TZhYB3vsAAAAVOFSkOdrr3SJaHTlQQ","event":"$set","entityType":"item","entityId":"Nexus","properties":{"available":"2016-03-18T13:31:49.016770+00:00","date":"2016-03-16T13:31:49.016770+00:00","expires":"2016-03-18T13:31:49.016770+00:00"},"eventTime":"2016-03-18T15:55:50.045Z","creationTime":"2016-03-18T15:55:50.049Z"}
 {"eventId":"MdgNfySNSsz0WVh1q6f3_gAAAVOFSkNKjmJz4kil3F0","event":"$set","entityType":"item","entityId":"Surface","properties":{"categories":["Tablets","Electronics","Microsoft"]},"eventTime":"2016-03-17T15:55:49.962Z","creationTime":"2016-03-17T15:55:49.966Z"}
 {"eventId":"MdgNfySNSsz0WVh1q6f3_gAAAVOFSkN-lNLH6dbWhjI","event":"$set","entityType":"item","entityId":"Surface","properties":{"countries":["United States","Canada"]},"eventTime":"2016-03-17T15:55:50.014Z","creationTime":"2016-03-17T15:55:50.018Z"}
 {"eventId":"MdgNfySNSsz0WVh1q6f3_gAAAVOFSkOmhp8HSvY0l2M","event":"$set","entityType":"item","entityId":"Surface","properties":{"available":"2016-03-15T08:43:49.016770+00:00","date":"2016-03-17T08:43:49.016770+00:00","expires":"2016-03-19T08:43:49.016770+00:00"},"eventTime":"2016-03-17T15:55:50.054Z","creationTime":"2016-03-17T15:55:50.060Z"}