You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2019/11/26 10:41:22 UTC
[james-project] 01/06: [Refactoring] rewrite InMemoryEventStore in
Scala
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit fff0427a01dc79252f5ff6373c24101da683bf32
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Nov 25 15:33:50 2019 +0100
[Refactoring] rewrite InMemoryEventStore in Scala
---
event-sourcing/event-store-memory/pom.xml | 17 ++++
.../eventstore/memory/InMemoryEventStore.java | 99 ----------------------
.../eventstore/memory/InMemoryEventStore.scala | 58 +++++++++++++
3 files changed, 75 insertions(+), 99 deletions(-)
diff --git a/event-sourcing/event-store-memory/pom.xml b/event-sourcing/event-store-memory/pom.xml
index aa57dd3..6f4b932 100644
--- a/event-sourcing/event-store-memory/pom.xml
+++ b/event-sourcing/event-store-memory/pom.xml
@@ -69,6 +69,23 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-java8-compat_${scala.base}</artifactId>
+ </dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
\ No newline at end of file
diff --git a/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java b/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java
deleted file mode 100644
index de65ca6..0000000
--- a/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.eventsourcing.eventstore.memory;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.james.eventsourcing.AggregateId;
-import org.apache.james.eventsourcing.Event;
-import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.eventstore.EventStoreFailedException;
-import org.apache.james.eventsourcing.eventstore.History;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-public class InMemoryEventStore implements EventStore {
-
- private final ConcurrentHashMap<AggregateId, History> store;
-
- public InMemoryEventStore() {
- this.store = new ConcurrentHashMap<>();
- }
-
- @Override
- public void appendAll(List<Event> events) {
- if (events.isEmpty()) {
- return;
- }
- AggregateId aggregateId = getAggregateId(events);
-
- if (!store.containsKey(aggregateId)) {
- appendToEmptyHistory(aggregateId, events);
- } else {
- appendToExistingHistory(aggregateId, events);
- }
- }
-
- private AggregateId getAggregateId(List<? extends Event> events) {
- Preconditions.checkArgument(!events.isEmpty());
- Preconditions.checkArgument(Event.belongsToSameAggregate(events));
-
- return events.stream()
- .map(Event::getAggregateId)
- .findFirst()
- .get();
- }
-
- private void appendToEmptyHistory(AggregateId aggregateId, List<Event> events) {
- History newHistory = History.of(events);
-
- History previousHistory = store.putIfAbsent(aggregateId, newHistory);
- if (previousHistory != null) {
- throw new EventStoreFailedException("Concurrent update to the EventStore detected");
- }
- }
-
- private void appendToExistingHistory(AggregateId aggregateId, List<? extends Event> events) {
- History currentHistory = store.get(aggregateId);
- List<Event> updatedEvents = updatedEvents(currentHistory, events);
- History updatedHistory = History.of(updatedEvents);
-
- boolean isReplaced = store.replace(aggregateId, currentHistory, updatedHistory);
- if (!isReplaced) {
- throw new EventStoreFailedException("Concurrent update to the EventStore detected");
- }
- }
-
- private List<Event> updatedEvents(History currentHistory, List<? extends Event> newEvents) {
- return ImmutableList.<Event>builder()
- .addAll(currentHistory.getEvents())
- .addAll(newEvents)
- .build();
- }
-
- @Override
- public History getEventsOfAggregate(AggregateId aggregateId) {
- return Optional.ofNullable(store.get(aggregateId))
- .orElse(History.empty());
- }
-}
diff --git a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
new file mode 100644
index 0000000..d7bf451
--- /dev/null
+++ b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
@@ -0,0 +1,58 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ***************************************************************/
+package org.apache.james.eventsourcing.eventstore.memory
+
+import java.util
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.base.Preconditions
+import org.apache.james.eventsourcing.eventstore.{EventStore, History}
+import org.apache.james.eventsourcing.{AggregateId, Event}
+
+import scala.collection.JavaConverters._
+
+class InMemoryEventStore() extends EventStore {
+ private val storeRef: AtomicReference[Map[AggregateId, History]] = new AtomicReference(Map().withDefault(_ => History.empty()))
+
+ override def appendAll(events: util.List[Event]): Unit = if (!events.isEmpty) appendAll(events.asScala)
+
+ override def getEventsOfAggregate(aggregateId: AggregateId): History = {
+ Preconditions.checkNotNull(aggregateId)
+ storeRef.get()(aggregateId)
+ }
+
+ def appendAll(events: Seq[Event]): Unit = {
+ val aggregateId: AggregateId = getAggregateId(events)
+ storeRef.updateAndGet(store => {
+ val updatedHistory = History.of((store(aggregateId).getEvents.asScala ++ events).asJava)
+ store.updated(aggregateId, updatedHistory)
+ })
+ }
+
+ private def getAggregateId(events: Seq[Event]): AggregateId = {
+ Preconditions.checkArgument(events.nonEmpty)
+ val aggregateId = events.head.getAggregateId
+ Preconditions.checkArgument(belongsToSameAggregate(aggregateId, events))
+ aggregateId
+ }
+
+ private def belongsToSameAggregate(aggregateId: AggregateId, events: Seq[Event]) =
+ events.forall(_.getAggregateId.equals(aggregateId))
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org