You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2018/09/10 13:30:15 UTC
svn commit: r1840465 - in /jackrabbit/oak/trunk:
oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java
oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java
Author: tomekr
Date: Mon Sep 10 13:30:15 2018
New Revision: 1840465
URL: http://svn.apache.org/viewvc?rev=1840465&view=rev
Log:
OAK-7710: CompositeNodeStore does not dispatch external events to observers
Added:
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java
Modified:
jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java
Added: jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java?rev=1840465&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java (added)
+++ jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/composite/CompositeNodeStoreClusterObservationTest.java Mon Sep 10 13:30:15 2018
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.composite;
+
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.TestNodeObserver;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
+import org.apache.jackrabbit.oak.spi.mount.Mounts;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class CompositeNodeStoreClusterObservationTest {
+
+ private MemoryDocumentStore ds;
+ private MemoryBlobStore bs;
+
+ @Rule
+ public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
+
+ private CompositeNodeStore store;
+ private DocumentNodeStore remote;
+ private DocumentNodeStore globalStore;
+
+ private TestNodeObserver observer;
+
+ @Before
+ public void initStore() {
+
+ remote = createNodeStore(1);
+ globalStore = createNodeStore(2);
+
+ MountInfoProvider mip = Mounts.newBuilder().build();
+
+ List<MountedNodeStore> nonDefaultStores = Lists.newArrayList();
+ store = new CompositeNodeStore(mip, globalStore, nonDefaultStores);
+
+ observer = new TestNodeObserver("/test");
+ }
+
+ @Test
+ public void localObserver() throws CommitFailedException {
+ store.addObserver(observer);
+
+ NodeBuilder builder = store.getRoot().builder();
+ builder.child("test").setProperty("foo", "bar");
+ merge(store, builder);
+
+ assertTrue("Node added event not observed for local change", observer.added.containsKey("/test"));
+ }
+
+ @Test
+ public void remoteObserver() throws CommitFailedException {
+ store.addObserver(observer);
+
+ NodeBuilder builder = remote.getRoot().builder();
+ builder.child("test").setProperty("foo", "bar");
+ merge(remote, builder);
+
+ remote.runBackgroundOperations();
+ globalStore.runBackgroundOperations();
+
+ assertTrue("Node added event not observed for local change", observer.added.containsKey("/test"));
+ }
+
+ private static void merge(NodeStore store, NodeBuilder builder)
+ throws CommitFailedException {
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ private DocumentNodeStore createNodeStore(int clusterId) {
+ if (ds == null) {
+ ds = new MemoryDocumentStore();
+ }
+ if (bs == null) {
+ bs = new MemoryBlobStore();
+ }
+ return createNodeStore(clusterId, ds, bs);
+ }
+
+ private DocumentNodeStore createNodeStore(int clusterId,
+ DocumentStore ds, BlobStore bs) {
+ return builderProvider.newBuilder().setDocumentStore(ds)
+ .setBlobStore(bs).setClusterId(clusterId)
+ .setAsyncDelay(0).build();
+ }
+}
Modified: jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java?rev=1840465&r1=1840464&r2=1840465&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/CompositeNodeStore.java Mon Sep 10 13:30:15 2018
@@ -26,6 +26,7 @@ import org.apache.jackrabbit.oak.api.Com
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.composite.checks.NodeStoreChecks;
+import org.apache.jackrabbit.oak.spi.commit.ChangeDispatcher;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -36,6 +37,7 @@ import org.apache.jackrabbit.oak.spi.mou
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +50,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -101,7 +102,7 @@ public class CompositeNodeStore implemen
final CompositionContext ctx;
- private final List<Observer> observers = new CopyOnWriteArrayList<>();
+ private final ChangeDispatcher dispatcher;
private final Lock mergeLock;
@@ -114,6 +115,14 @@ public class CompositeNodeStore implemen
this.ctx = new CompositionContext(mip, globalStore, nonDefaultStore, nodeStateMonitor, nodeBuilderMonitor);
this.ignoreReadOnlyWritePaths = new TreeSet<>(ignoreReadOnlyWritePaths);
this.mergeLock = new ReentrantLock();
+ this.dispatcher = new ChangeDispatcher(getRoot());
+
+ // setup observation proxy mechanism for underlying store for events not dispatched from within our
+ // merge
+ if (globalStore instanceof Observable) {
+ Observable globalStoreObservable = (Observable) globalStore;
+ globalStoreObservable.addObserver(new MountedNodeStoreObserver());
+ }
}
@Override
@@ -167,9 +176,6 @@ public class CompositeNodeStore implemen
}
CompositeNodeState newRoot = ctx.createRootNodeState(resultStates);
- for (Observer observer : observers) {
- observer.contentChanged(newRoot, info);
- }
return newRoot;
} finally {
mergeLock.unlock();
@@ -440,14 +446,7 @@ public class CompositeNodeStore implemen
@Override
public Closeable addObserver(final Observer observer) {
- observer.contentChanged(getRoot(), CommitInfo.EMPTY_EXTERNAL);
- observers.add(observer);
- return new Closeable() {
- @Override
- public void close() throws IOException {
- observers.remove(observer);
- }
- };
+ return dispatcher.addObserver(observer);
}
private Set<String> getIgnoredPaths(Set<String> paths) {
@@ -543,4 +542,16 @@ public class CompositeNodeStore implemen
buildMountCount, mipMountCount);
}
}
+
+ private class MountedNodeStoreObserver implements Observer {
+ @Override
+ public void contentChanged(@NotNull NodeState root, @NotNull CommitInfo info) {
+ Map<MountedNodeStore, NodeState> nodeStates = newHashMap();
+ for (MountedNodeStore nodeStore : ctx.getNonDefaultStores()) {
+ nodeStates.put(nodeStore, nodeStore.getNodeStore().getRoot());
+ }
+ nodeStates.put(ctx.getGlobalStore(), root);
+ dispatcher.contentChanged(ctx.createRootNodeState(nodeStates), info);
+ }
+ }
}