You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by GitBox <gi...@apache.org> on 2021/11/22 11:21:04 UTC

[GitHub] [james-project] chibenwa commented on a change in pull request #757: [WIP] JAMES-3534 Identity/set update should work on existing server-set Identities

chibenwa commented on a change in pull request #757:
URL: https://github.com/apache/james-project/pull/757#discussion_r754172505



##########
File path: server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
##########
@@ -120,15 +131,42 @@ class IdentityRepository @Inject()(customIdentityDao: CustomIdentityDAO, identit
       SMono.error(ForbiddenSendFromException(creationRequest.email))
     }
 
-  def list(user: Username): Publisher[Identity] = SFlux.merge(Seq(
-    customIdentityDao.list(user),
-    SMono.fromCallable(() => identityFactory.listIdentities(user))
-      .subscribeOn(Schedulers.elastic())
-      .flatMapMany(SFlux.fromIterable)))
+  def list(user: Username): Publisher[Identity] = {
+    val customIdentities: SFlux[(String, Identity)] = SFlux.fromPublisher(customIdentityDao.list(user))
+      .map(e=> (IdentityOrigin.CUSTOM, e))
 
-  def update(user: Username, identityId: IdentityId, identityUpdate: IdentityUpdate): Publisher[Unit] = customIdentityDao.update(user, identityId, identityUpdate)
+    val serverSetIdentities: SFlux[(String, Identity)] = SMono.fromCallable(() => identityFactory.listIdentities(user))
+      .subscribeOn(Schedulers.elastic())
+      .flatMapMany(SFlux.fromIterable)
+      .map(e=> (IdentityOrigin.SERVER_SET, e))
+
+    SFlux.merge(Seq(customIdentities, serverSetIdentities))
+      .groupBy(_._2.id)
+      .flatMap(_.reduce((id1, id2) => id1._1 match {
+        case IdentityOrigin.CUSTOM => id1
+        case _ => id2
+      }))
+      .map(_._2)
+  }
+
+  def update(user: Username, identityId: IdentityId, identityUpdateRequest: IdentityUpdateRequest): Publisher[Unit] =
+    SMono.fromPublisher(customIdentityDao.update(user, identityId, identityUpdateRequest))
+      .onErrorResume {
+        case error: IdentityNotFoundException =>
+          SFlux.fromIterable(identityFactory.listIdentities(user))
+            .filter(identity => identity.id.equals(identityId))
+            .next()
+            .flatMap(identity => SMono.fromPublisher(customIdentityDao.save(user, identityId, identityUpdateRequest.asCreationRequest(identity.email))))
+            .switchIfEmpty(SMono.error(error))
+            .`then`()
+      }
 
   def delete(username: Username, ids: Seq[IdentityId]): Publisher[Unit] = customIdentityDao.delete(username, ids)
 }
 
-case class IdentityNotFoundException(id: IdentityId) extends RuntimeException(s"$id could not be found")
\ No newline at end of file
+case class IdentityNotFoundException(id: IdentityId) extends RuntimeException(s"$id could not be found")
+
+object IdentityOrigin {
+  val CUSTOM: String = "custom"
+  val SERVER_SET: String = "server_set"
+}

Review comment:
       Use a case object?

##########
File path: server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
##########
@@ -120,15 +131,42 @@ class IdentityRepository @Inject()(customIdentityDao: CustomIdentityDAO, identit
       SMono.error(ForbiddenSendFromException(creationRequest.email))
     }
 
-  def list(user: Username): Publisher[Identity] = SFlux.merge(Seq(
-    customIdentityDao.list(user),
-    SMono.fromCallable(() => identityFactory.listIdentities(user))
-      .subscribeOn(Schedulers.elastic())
-      .flatMapMany(SFlux.fromIterable)))
+  def list(user: Username): Publisher[Identity] = {
+    val customIdentities: SFlux[(String, Identity)] = SFlux.fromPublisher(customIdentityDao.list(user))
+      .map(e=> (IdentityOrigin.CUSTOM, e))
 
-  def update(user: Username, identityId: IdentityId, identityUpdate: IdentityUpdate): Publisher[Unit] = customIdentityDao.update(user, identityId, identityUpdate)
+    val serverSetIdentities: SFlux[(String, Identity)] = SMono.fromCallable(() => identityFactory.listIdentities(user))
+      .subscribeOn(Schedulers.elastic())
+      .flatMapMany(SFlux.fromIterable)
+      .map(e=> (IdentityOrigin.SERVER_SET, e))

Review comment:
       Missing a space between `e` and `=>`
   
   Also we can likely find a better name than `e` and if we can't we could use `_`.

##########
File path: server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
##########
@@ -120,15 +131,42 @@ class IdentityRepository @Inject()(customIdentityDao: CustomIdentityDAO, identit
       SMono.error(ForbiddenSendFromException(creationRequest.email))
     }
 
-  def list(user: Username): Publisher[Identity] = SFlux.merge(Seq(
-    customIdentityDao.list(user),
-    SMono.fromCallable(() => identityFactory.listIdentities(user))
-      .subscribeOn(Schedulers.elastic())
-      .flatMapMany(SFlux.fromIterable)))
+  def list(user: Username): Publisher[Identity] = {
+    val customIdentities: SFlux[(String, Identity)] = SFlux.fromPublisher(customIdentityDao.list(user))
+      .map(e=> (IdentityOrigin.CUSTOM, e))
 
-  def update(user: Username, identityId: IdentityId, identityUpdate: IdentityUpdate): Publisher[Unit] = customIdentityDao.update(user, identityId, identityUpdate)
+    val serverSetIdentities: SFlux[(String, Identity)] = SMono.fromCallable(() => identityFactory.listIdentities(user))
+      .subscribeOn(Schedulers.elastic())
+      .flatMapMany(SFlux.fromIterable)
+      .map(e=> (IdentityOrigin.SERVER_SET, e))
+
+    SFlux.merge(Seq(customIdentities, serverSetIdentities))
+      .groupBy(_._2.id)

Review comment:
       groupBy is fine.

##########
File path: server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
##########
@@ -120,15 +131,42 @@ class IdentityRepository @Inject()(customIdentityDao: CustomIdentityDAO, identit
       SMono.error(ForbiddenSendFromException(creationRequest.email))
     }
 
-  def list(user: Username): Publisher[Identity] = SFlux.merge(Seq(
-    customIdentityDao.list(user),
-    SMono.fromCallable(() => identityFactory.listIdentities(user))
-      .subscribeOn(Schedulers.elastic())
-      .flatMapMany(SFlux.fromIterable)))
+  def list(user: Username): Publisher[Identity] = {
+    val customIdentities: SFlux[(String, Identity)] = SFlux.fromPublisher(customIdentityDao.list(user))
+      .map(e=> (IdentityOrigin.CUSTOM, e))
 
-  def update(user: Username, identityId: IdentityId, identityUpdate: IdentityUpdate): Publisher[Unit] = customIdentityDao.update(user, identityId, identityUpdate)
+    val serverSetIdentities: SFlux[(String, Identity)] = SMono.fromCallable(() => identityFactory.listIdentities(user))
+      .subscribeOn(Schedulers.elastic())
+      .flatMapMany(SFlux.fromIterable)
+      .map(e=> (IdentityOrigin.SERVER_SET, e))
+
+    SFlux.merge(Seq(customIdentities, serverSetIdentities))
+      .groupBy(_._2.id)
+      .flatMap(_.reduce((id1, id2) => id1._1 match {
+        case IdentityOrigin.CUSTOM => id1
+        case _ => id2
+      }))

Review comment:
       IMO we should have an `identity.merge(otherIdentity)` method.

##########
File path: server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/CustomIdentityDAO.scala
##########
@@ -120,15 +131,42 @@ class IdentityRepository @Inject()(customIdentityDao: CustomIdentityDAO, identit
       SMono.error(ForbiddenSendFromException(creationRequest.email))
     }
 
-  def list(user: Username): Publisher[Identity] = SFlux.merge(Seq(
-    customIdentityDao.list(user),
-    SMono.fromCallable(() => identityFactory.listIdentities(user))
-      .subscribeOn(Schedulers.elastic())
-      .flatMapMany(SFlux.fromIterable)))
+  def list(user: Username): Publisher[Identity] = {
+    val customIdentities: SFlux[(String, Identity)] = SFlux.fromPublisher(customIdentityDao.list(user))
+      .map(e=> (IdentityOrigin.CUSTOM, e))

Review comment:
       Missing a space between `e` and `=>`
   
   Also we can likely find a better name than `e` and if we can't we could use `_`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org