You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2019/11/26 10:41:22 UTC

[james-project] 01/06: [Refactoring] rewrite InMemoryEventStore in Scala

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fff0427a01dc79252f5ff6373c24101da683bf32
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Nov 25 15:33:50 2019 +0100

    [Refactoring] rewrite InMemoryEventStore in Scala
---
 event-sourcing/event-store-memory/pom.xml          | 17 ++++
 .../eventstore/memory/InMemoryEventStore.java      | 99 ----------------------
 .../eventstore/memory/InMemoryEventStore.scala     | 58 +++++++++++++
 3 files changed, 75 insertions(+), 99 deletions(-)

diff --git a/event-sourcing/event-store-memory/pom.xml b/event-sourcing/event-store-memory/pom.xml
index aa57dd3..6f4b932 100644
--- a/event-sourcing/event-store-memory/pom.xml
+++ b/event-sourcing/event-store-memory/pom.xml
@@ -69,6 +69,23 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang.modules</groupId>
+            <artifactId>scala-java8-compat_${scala.base}</artifactId>
+        </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
 
 </project>
\ No newline at end of file
diff --git a/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java b/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java
deleted file mode 100644
index de65ca6..0000000
--- a/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.eventsourcing.eventstore.memory;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.james.eventsourcing.AggregateId;
-import org.apache.james.eventsourcing.Event;
-import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.eventstore.EventStoreFailedException;
-import org.apache.james.eventsourcing.eventstore.History;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-public class InMemoryEventStore implements EventStore {
-
-    private final ConcurrentHashMap<AggregateId, History> store;
-
-    public InMemoryEventStore() {
-        this.store = new ConcurrentHashMap<>();
-    }
-
-    @Override
-    public void appendAll(List<Event> events) {
-        if (events.isEmpty()) {
-            return;
-        }
-        AggregateId aggregateId = getAggregateId(events);
-
-        if (!store.containsKey(aggregateId)) {
-            appendToEmptyHistory(aggregateId, events);
-        } else {
-            appendToExistingHistory(aggregateId, events);
-        }
-    }
-
-    private AggregateId getAggregateId(List<? extends Event> events) {
-        Preconditions.checkArgument(!events.isEmpty());
-        Preconditions.checkArgument(Event.belongsToSameAggregate(events));
-
-        return events.stream()
-            .map(Event::getAggregateId)
-            .findFirst()
-            .get();
-    }
-
-    private void appendToEmptyHistory(AggregateId aggregateId, List<Event> events) {
-        History newHistory = History.of(events);
-
-        History previousHistory = store.putIfAbsent(aggregateId, newHistory);
-        if (previousHistory != null) {
-            throw new EventStoreFailedException("Concurrent update to the EventStore detected");
-        }
-    }
-
-    private void appendToExistingHistory(AggregateId aggregateId, List<? extends Event> events) {
-        History currentHistory = store.get(aggregateId);
-        List<Event> updatedEvents = updatedEvents(currentHistory, events);
-        History updatedHistory = History.of(updatedEvents);
-
-        boolean isReplaced = store.replace(aggregateId, currentHistory, updatedHistory);
-        if (!isReplaced) {
-            throw new EventStoreFailedException("Concurrent update to the EventStore detected");
-        }
-    }
-
-    private List<Event> updatedEvents(History currentHistory, List<? extends Event> newEvents) {
-        return ImmutableList.<Event>builder()
-            .addAll(currentHistory.getEvents())
-            .addAll(newEvents)
-            .build();
-    }
-
-    @Override
-    public History getEventsOfAggregate(AggregateId aggregateId) {
-        return Optional.ofNullable(store.get(aggregateId))
-            .orElse(History.empty());
-    }
-}
diff --git a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
new file mode 100644
index 0000000..d7bf451
--- /dev/null
+++ b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
@@ -0,0 +1,58 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                 *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
+package org.apache.james.eventsourcing.eventstore.memory
+
+import java.util
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.base.Preconditions
+import org.apache.james.eventsourcing.eventstore.{EventStore, History}
+import org.apache.james.eventsourcing.{AggregateId, Event}
+
+import scala.collection.JavaConverters._
+
+class InMemoryEventStore() extends EventStore {
+  private val storeRef: AtomicReference[Map[AggregateId, History]] = new AtomicReference(Map().withDefault(_ => History.empty()))
+
+  override def appendAll(events: util.List[Event]): Unit = if (!events.isEmpty) appendAll(events.asScala)
+
+  override def getEventsOfAggregate(aggregateId: AggregateId): History = {
+    Preconditions.checkNotNull(aggregateId)
+    storeRef.get()(aggregateId)
+  }
+
+  def appendAll(events: Seq[Event]): Unit = {
+    val aggregateId: AggregateId = getAggregateId(events)
+    storeRef.updateAndGet(store => {
+      val updatedHistory = History.of((store(aggregateId).getEvents.asScala ++ events).asJava)
+      store.updated(aggregateId, updatedHistory)
+    })
+  }
+
+  private def getAggregateId(events: Seq[Event]): AggregateId = {
+    Preconditions.checkArgument(events.nonEmpty)
+    val aggregateId = events.head.getAggregateId
+    Preconditions.checkArgument(belongsToSameAggregate(aggregateId, events))
+    aggregateId
+  }
+
+  private def belongsToSameAggregate(aggregateId: AggregateId, events: Seq[Event]) =
+    events.forall(_.getAggregateId.equals(aggregateId))
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org