You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/01/23 13:35:16 UTC
svn commit: r1560666 - in /jackrabbit/oak/trunk/oak-core: ./
src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/
src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/
src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ src/test/java/...
Author: chetanm
Date: Thu Jan 23 12:35:15 2014
New Revision: 1560666
URL: http://svn.apache.org/r1560666
Log:
OAK-891 - Use DirectMemory as Level 2/Offheap cache
Adding OffHeap cache support which uses Kryo and DirectMemory
-- OffHeapCache interface to remove direct dependency on DM and Kryo classes
-- Cache invalidation logic takes care of off heap cache content also
-- Care is taken to not deserialize complete NodeDocument just for cache consistency
check.
-- Custom Kryo serializer for compact storage of NodeDocument and Revision classes
-- Off heap cache is used if off heap cache size is set to non zero value in MongoMK.Builder
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/pom.xml
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java
Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Thu Jan 23 12:35:15 2014
@@ -252,6 +252,20 @@
<scope>provided</scope>
</dependency>
+ <!-- Required for OffHeap storage and serialization-->
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.22</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.directmemory</groupId>
+ <artifactId>directmemory-cache</artifactId>
+ <version>0.2</version>
+ <optional>true</optional>
+ </dependency>
+
<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java Thu Jan 23 12:35:15 2014
@@ -19,6 +19,12 @@
package org.apache.jackrabbit.oak.plugins.mongomk;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
@@ -34,11 +40,6 @@ import org.apache.jackrabbit.oak.plugins
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
abstract class CacheInvalidator {
static final Logger LOG = LoggerFactory.getLogger(CacheInvalidator.class);
@@ -87,11 +88,12 @@ abstract class CacheInvalidator {
@Override
public InvalidationResult invalidateCache() {
InvalidationResult result = new InvalidationResult();
- Map<String, NodeDocument> cacheMap = documentStore.getCache();
- result.cacheSize = cacheMap.size();
- for (String key : cacheMap.keySet()) {
- documentStore.invalidateCache(Collection.NODES, key);
+ int size = 0;
+ for (Map.Entry<String, ? extends CachedNodeDocument> e : documentStore.getCacheEntries()) {
+ size++;
+ documentStore.invalidateCache(Collection.NODES, e.getKey());
}
+ result.cacheSize = size;
return result;
}
}
@@ -107,12 +109,18 @@ abstract class CacheInvalidator {
@Override
public InvalidationResult invalidateCache() {
- final Map<String, NodeDocument> cacheMap = documentStore.getCache();
final InvalidationResult result = new InvalidationResult();
- result.cacheSize = cacheMap.size();
+
+ int size = 0;
+ List<String> cachedKeys = new ArrayList<String>();
+ for (Map.Entry<String, ? extends CachedNodeDocument> e : documentStore.getCacheEntries()) {
+ size++;
+ cachedKeys.add(e.getKey());
+ }
+ result.cacheSize = size;
QueryBuilder query = QueryBuilder.start(Document.ID)
- .in(cacheMap.keySet());
+ .in(cachedKeys);
// Fetch only the lastRev map and id
final BasicDBObject keys = new BasicDBObject(Document.ID, 1);
@@ -121,12 +129,13 @@ abstract class CacheInvalidator {
// Fetch lastRev for each such node
DBCursor cursor = nodes.find(query.get(), keys);
result.queryCount++;
+
for (DBObject obj : cursor) {
result.cacheEntriesProcessedCount++;
String id = (String) obj.get(Document.ID);
Number modCount = (Number) obj.get(Document.MOD_COUNT);
- NodeDocument cachedDoc = documentStore.getIfCached(Collection.NODES, id);
+ CachedNodeDocument cachedDoc = documentStore.getCachedNodeDoc(id);
if (cachedDoc != null
&& !Objects.equal(cachedDoc.getModCount(), modCount)) {
documentStore.invalidateCache(Collection.NODES, id);
@@ -141,7 +150,7 @@ abstract class CacheInvalidator {
private static class HierarchicalInvalidator extends CacheInvalidator {
-
+
private static final TreeTraverser<TreeNode> TRAVERSER = new TreeTraverser<TreeNode>() {
@Override
public Iterable<TreeNode> children(TreeNode root) {
@@ -160,11 +169,7 @@ abstract class CacheInvalidator {
@Override
public InvalidationResult invalidateCache() {
final InvalidationResult result = new InvalidationResult();
- Map<String, NodeDocument> cacheMap = documentStore.getCache();
- TreeNode root = constructTreeFromPaths(cacheMap.keySet());
-
- // Invalidation stats
- result.cacheSize = cacheMap.size();
+ TreeNode root = constructTreeFromPaths(documentStore.getCacheEntries(), result);
// Time at which the check is started. All NodeDocuments which
// are found to be up-to-date would be marked touched at this time
@@ -219,7 +224,7 @@ abstract class CacheInvalidator {
String id = (String) obj.get(Document.ID);
final TreeNode tn2 = sameLevelNodes.get(id);
- NodeDocument cachedDoc = tn2.getDocument();
+ CachedNodeDocument cachedDoc = tn2.getDocument();
if (cachedDoc != null) {
boolean noChangeInModCount = Objects.equal(latestModCount, cachedDoc.getModCount());
if (noChangeInModCount) {
@@ -256,11 +261,19 @@ abstract class CacheInvalidator {
return result;
}
- private TreeNode constructTreeFromPaths(Set<String> ids) {
+ private TreeNode constructTreeFromPaths(Iterable<? extends Map.Entry<String, ? extends CachedNodeDocument>> entries,
+ InvalidationResult result) {
TreeNode root = new TreeNode("");
- for (String id : ids) {
+ for (Map.Entry<String, ? extends CachedNodeDocument> e : entries) {
TreeNode current = root;
- String path = Utils.getPathFromId(id);
+
+ //TODO Split documents are immutable hence no need to
+ //check them
+ //TODO Need to determine way to determine if the
+ //key is referring to a split document
+
+ String path = Utils.getPathFromId(e.getKey());
+ result.cacheSize++;
for (String name : PathUtils.elements(path)) {
current = current.child(name);
}
@@ -329,12 +342,12 @@ abstract class CacheInvalidator {
documentStore.invalidateCache(Collection.NODES, getId());
}
- public NodeDocument getDocument() {
- return documentStore.getIfCached(Collection.NODES, id);
+ public CachedNodeDocument getDocument() {
+ return documentStore.getCachedNodeDoc(id);
}
public boolean isUptodate(long time) {
- NodeDocument doc = documentStore.getIfCached(Collection.NODES, id);
+ CachedNodeDocument doc = documentStore.getCachedNodeDoc(id);
if (doc != null) {
return doc.isUpToDate(time);
} else {
@@ -346,7 +359,7 @@ abstract class CacheInvalidator {
}
public void markUptodate(long cacheCheckTime) {
- NodeDocument doc = getDocument();
+ CachedNodeDocument doc = getDocument();
if (doc == null) {
return;
}
@@ -358,7 +371,7 @@ abstract class CacheInvalidator {
return id;
}
- private void markUptodate(long cacheCheckTime, NodeDocument upToDateRoot) {
+ private void markUptodate(long cacheCheckTime, CachedNodeDocument upToDateRoot) {
for (TreeNode tn : children.values()) {
tn.markUptodate(cacheCheckTime, upToDateRoot);
}
@@ -366,8 +379,8 @@ abstract class CacheInvalidator {
markUptodate(getId(), cacheCheckTime, upToDateRoot);
}
- private void markUptodate(String key, long time, NodeDocument upToDateRoot) {
- NodeDocument doc = documentStore.getIfCached(Collection.NODES, key);
+ private void markUptodate(String key, long time, CachedNodeDocument upToDateRoot) {
+ CachedNodeDocument doc = documentStore.getCachedNodeDoc(key);
if (doc == null) {
return;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java Thu Jan 23 12:35:15 2014
@@ -16,6 +16,8 @@
*/
package org.apache.jackrabbit.oak.plugins.mongomk;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -32,16 +34,21 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import com.google.common.base.Splitter;
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.plugins.mongomk.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.mongomk.UpdateOp.Operation;
+import org.apache.jackrabbit.oak.plugins.mongomk.cache.ForwardingListener;
+import org.apache.jackrabbit.oak.plugins.mongomk.cache.NodeDocOffHeapCache;
+import org.apache.jackrabbit.oak.plugins.mongomk.cache.OffHeapCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
-import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
import com.mongodb.BasicDBObject;
@@ -106,10 +113,30 @@ public class MongoDocumentStore implemen
nodes.ensureIndex(index, options);
// TODO expire entries if the parent was changed
- nodesCache = builder.buildCache(builder.getDocumentCacheSize());
+ if(builder.useOffHeapCache()){
+ nodesCache = createOffHeapCache(builder);
+ }else{
+ nodesCache = builder.buildCache(builder.getDocumentCacheSize());
+ }
+
cacheStats = new CacheStats(nodesCache, "MongoMk-Documents", builder.getWeigher(),
builder.getDocumentCacheSize());
}
+
+ private Cache<String , NodeDocument> createOffHeapCache(MongoMK.Builder builder){
+ ForwardingListener<String , NodeDocument> listener = ForwardingListener.newInstance();
+
+ Cache<String,NodeDocument> primaryCache = CacheBuilder.newBuilder()
+ .weigher(builder.getWeigher())
+ .maximumWeight(builder.getDocumentCacheSize())
+ .removalListener(listener)
+ .recordStats()
+ .build();
+
+ Cache<String,NodeDocument> cache =
+ new NodeDocOffHeapCache( primaryCache, listener, builder, this );
+ return cache;
+ }
private static long start() {
return LOG_TIME ? System.currentTimeMillis() : 0;
@@ -568,14 +595,35 @@ public class MongoDocumentStore implemen
LOG.debug("MongoDB time: " + timeSum);
}
nodes.getDB().getMongo().close();
+
+ if(nodesCache instanceof Closeable){
+ try {
+ ((Closeable)nodesCache).close();
+ } catch (IOException e) {
+
+ LOG.warn("Error occurred while closing Off Heap Cache",e);
+ }
+ }
}
public CacheStats getCacheStats() {
return cacheStats;
}
- Map<String, NodeDocument> getCache() {
- return Collections.unmodifiableMap(nodesCache.asMap());
+ Iterable<? extends Map.Entry<String, ? extends CachedNodeDocument>> getCacheEntries() {
+ if(nodesCache instanceof OffHeapCache){
+ return Iterables.concat(nodesCache.asMap().entrySet(),
+ ((OffHeapCache)nodesCache).offHeapEntriesMap().entrySet());
+ }
+ return nodesCache.asMap().entrySet();
+ }
+
+ CachedNodeDocument getCachedNodeDoc(String id){
+ if(nodesCache instanceof OffHeapCache){
+ return ((OffHeapCache) nodesCache).getCachedDocument(id);
+ }
+
+ return nodesCache.getIfPresent(id);
}
private static void log(String message, Object... args) {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Thu Jan 23 12:35:15 2014
@@ -661,6 +661,7 @@ public class MongoMK implements MicroKer
private long docChildrenCacheSize;
private boolean useSimpleRevision;
private long splitDocumentAgeMillis = 5 * 60 * 1000;
+ private long offHeapCacheSize = -1;
public Builder() {
memoryCacheSize(DEFAULT_MEMORY_CACHE_SIZE);
@@ -846,6 +847,19 @@ public class MongoMK implements MicroKer
return splitDocumentAgeMillis;
}
+ public boolean useOffHeapCache() {
+ return this.offHeapCacheSize > 0;
+ }
+
+ public long getOffHeapCacheSize() {
+ return offHeapCacheSize;
+ }
+
+ public Builder offHeapCacheSize(long offHeapCacheSize) {
+ this.offHeapCacheSize = offHeapCacheSize;
+ return this;
+ }
+
/**
* Open the MongoMK instance using the configured options.
*
@@ -855,13 +869,6 @@ public class MongoMK implements MicroKer
return new MongoMK(this);
}
- /**
- * Create a cache.
- *
- * @param <V> the value type
- * @param maxWeight
- * @return the cache
- */
public <V extends CacheValue> Cache<String, V> buildCache(long maxWeight) {
if (LIRS_CACHE) {
return CacheLIRS.newBuilder().weigher(weigher).
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+/**
+ * Listener which forwards the notifications to a delegate. It is used to bridge
+ * multiple instances.
+ *
+ */
+public class ForwardingListener<K, V>
+ implements RemovalListener<K, V> {
+ private RemovalListener<K, V> delegate;
+
+ public ForwardingListener() {
+ }
+
+ public ForwardingListener(RemovalListener<K, V> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void onRemoval(RemovalNotification<K, V> notification) {
+ if (delegate != null) {
+ delegate.onRemoval(notification);
+ }
+ }
+
+ public void setDelegate(RemovalListener<K, V> delegate) {
+ this.delegate = delegate;
+ }
+
+ public static <K, V> ForwardingListener<K, V> newInstance() {
+ return new ForwardingListener<K, V>();
+ }
+
+ public static <K, V> ForwardingListener<K, V> newInstance(RemovalListener<K, V> delegate) {
+ return new ForwardingListener<K, V>(delegate);
+ }
+}
\ No newline at end of file
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.util.NavigableMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.Revision;
+
+/**
+ * Factory to create Kryo instances customized for managing NodeDocument
+ */
+public class KryoFactory {
+
+ public static Kryo createInstance(DocumentStore documentStore){
+ Kryo kryo = new Kryo();
+ kryo.setReferences(false);
+ kryo.register(Revision.class, new Serializers.RevisionSerizlizer());
+ kryo.register(NodeDocument.class, new Serializers.NodeDocumentSerializer(documentStore));
+ kryo.register(NavigableMap.class);
+
+ //All the required classes need to be registered explicitly
+ kryo.setRegistrationRequired(true);
+ return kryo;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class KryoSerializer implements Closeable {
+ /* buffer size */
+ private static final int BUFFER_SIZE = 1024;
+
+ private final KryoPool pool;
+
+ public KryoSerializer(KryoPool pool) {
+ this.pool = pool;
+ }
+
+ public <T> byte[] serialize(T obj)
+ throws IOException {
+ KryoHolder kh = null;
+ try {
+ kh = pool.get();
+ kh.reset();
+ kh.kryo.writeObject(kh.output, obj);
+ return kh.output.toBytes();
+ } finally {
+ if (kh != null) {
+ pool.done(kh);
+ }
+ }
+ }
+
+
+ public <T> T deserialize(byte[] source, Class<T> clazz)
+ throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
+ KryoHolder kh = null;
+ try {
+ kh = pool.get();
+ Input input = new Input(source);
+ return kh.kryo.readObject(input, clazz);
+ } finally {
+ if (kh != null) {
+ pool.done(kh);
+ }
+ }
+ }
+
+ /**
+ * Closes the pool releasing any associated Kryo instance with it
+ */
+ @Override
+ public void close() throws IOException {
+ pool.close();
+ }
+
+ private static class KryoHolder {
+ final Kryo kryo;
+ final Output output = new Output(BUFFER_SIZE, -1);
+
+ KryoHolder(Kryo kryo) {
+ this.kryo = kryo;
+ }
+
+ private void reset() {
+ output.clear();
+ }
+ }
+
+ public static class KryoPool {
+ private final Queue<KryoHolder> objects = new ConcurrentLinkedQueue<KryoHolder>();
+
+ public KryoHolder get() {
+ KryoHolder kh;
+ if ((kh = objects.poll()) == null) {
+ kh = new KryoHolder(createInstance());
+ }
+ return kh;
+ }
+
+ public void done(KryoHolder kh) {
+ objects.offer(kh);
+ }
+
+ public void close() {
+ objects.clear();
+ }
+
+ /**
+ * Sub classes can customize the Kryo instance by overriding this method
+ *
+ * @return created Kryo instance
+ */
+ protected Kryo createInstance() {
+ Kryo kryo = new Kryo();
+ kryo.setReferences(false);
+ return kryo;
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.base.Stopwatch;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.ForwardingCache;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.directmemory.measures.Ram;
+import org.apache.directmemory.memory.MemoryManagerService;
+import org.apache.directmemory.memory.MemoryManagerServiceImpl;
+import org.apache.directmemory.memory.Pointer;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.cache.CacheValue;
+import org.apache.jackrabbit.oak.plugins.mongomk.CachedNodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.MongoMK;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.cache.AbstractCache.SimpleStatsCounter;
+import static com.google.common.cache.AbstractCache.StatsCounter;
+
+public class NodeDocOffHeapCache
+ extends ForwardingCache.SimpleForwardingCache<String, NodeDocument>
+ implements Closeable, OffHeapCache {
+ private final StatsCounter statsCounter = new SimpleStatsCounter();
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final Cache<String, NodeDocReference> offHeapCache;
+ private final CacheStats offHeapCacheStats;
+
+ private final MemoryManagerService<NodeDocument> memoryManager;
+
+ private final KryoSerializer serializer;
+
+ public NodeDocOffHeapCache(Cache<String, NodeDocument> delegate,
+ ForwardingListener<String, NodeDocument> forwardingListener,
+ MongoMK.Builder builder,
+ DocumentStore documentStore) {
+ super(delegate);
+ forwardingListener.setDelegate(new PrimaryRemovalListener());
+
+ final long maxMemory = builder.getOffHeapCacheSize();
+
+ //TODO We may also expire the entries from cache if not accessed for some time
+ offHeapCache = CacheBuilder.newBuilder()
+ .weigher(builder.getWeigher())
+ .maximumWeight(maxMemory)
+ .removalListener(new SecondaryRemovalListener())
+ .recordStats()
+ .build();
+
+ offHeapCacheStats = new CacheStats(offHeapCache, "MongoMk-Documents-L2", builder.getWeigher(),
+ builder.getOffHeapCacheSize());
+
+ final long bufferSize = Ram.Gb(1);
+ int noOfBuffers = Math.max(1, (int) (maxMemory / bufferSize));
+ int buffSize = (int) Math.min(maxMemory, bufferSize);
+
+ //TODO Check if UnsafeMemoryManagerServiceImpl should be preferred
+ //on Sun/Oracle JDK
+ memoryManager = new MemoryManagerServiceImpl<NodeDocument>();
+ memoryManager.init(noOfBuffers, buffSize);
+
+ serializer = new KryoSerializer(new OakKryoPool(documentStore));
+ }
+
+ @Override
+ public NodeDocument getIfPresent(Object key) {
+ NodeDocument result = super.getIfPresent(key);
+ if (result == null) {
+ result = retrieve(key, false);
+ }
+ return result;
+ }
+
+
+ @Override
+ public NodeDocument get(final String key, final Callable<? extends NodeDocument> valueLoader)
+ throws ExecutionException {
+ return super.get(key, new Callable<NodeDocument>() {
+ @Override
+ public NodeDocument call()
+ throws Exception {
+ //Check in offHeap first
+ NodeDocument result = retrieve(key, true);
+
+ //Not found in L2 then load
+ if (result == null) {
+ result = valueLoader.call();
+ }
+ return result;
+ }
+ });
+ }
+
+ @Override
+ public ImmutableMap<String, NodeDocument> getAllPresent(Iterable<?> keys) {
+ @SuppressWarnings("unchecked") List<String> list = Lists.newArrayList((Iterable<String>) keys);
+ ImmutableMap<String, NodeDocument> result = super.getAllPresent(list);
+
+ //All the requested keys found then no
+ //need to check L2
+ if (result.size() == list.size()) {
+ return result;
+ }
+
+ //Look up value from L2
+ Map<String, NodeDocument> r2 = Maps.newHashMap(result);
+ for (String key : list) {
+ if (!result.containsKey(key)) {
+ NodeDocument val = retrieve(key, false);
+ if (val != null) {
+ r2.put(key, val);
+ }
+ }
+ }
+ return ImmutableMap.copyOf(r2);
+ }
+
+ @Override
+ public void invalidate(Object key) {
+ super.invalidate(key);
+ offHeapCache.invalidate(key);
+ }
+
+ @Override
+ public void invalidateAll(Iterable<?> keys) {
+ super.invalidateAll(keys);
+ offHeapCache.invalidateAll(keys);
+ }
+
+ @Override
+ public void invalidateAll() {
+ super.invalidateAll();
+ offHeapCache.invalidateAll();
+ }
+
+ @Override
+ public void close() throws IOException {
+ memoryManager.close();
+ serializer.close();
+ }
+
+ @Override
+ public Map<String, ? extends CachedNodeDocument> offHeapEntriesMap() {
+ return Collections.unmodifiableMap(offHeapCache.asMap());
+ }
+
+ @Override
+ public CacheStats getCacheStats() {
+ return offHeapCacheStats;
+ }
+
+ @Nullable
+ @Override
+ public CachedNodeDocument getCachedDocument(String id) {
+ NodeDocument doc = super.getIfPresent(id);
+ if (doc != null) {
+ return doc;
+ }
+ return offHeapCache.getIfPresent(id);
+ }
+
+ /**
+ * Retrieves the value from the off heap cache.
+ *
+ * @param key cache entry key to retrieve
+ * @param invalidateAfterRetrieve set it to true if the entry from off heap cache has
+ * to be invalidated. This would be the case when value loaded is
+ * made part of L1 cache
+ */
+ private NodeDocument retrieve(Object key, boolean invalidateAfterRetrieve) {
+ Stopwatch watch = Stopwatch.createStarted();
+
+ NodeDocReference value = offHeapCache.getIfPresent(key);
+ if (value == null) {
+ statsCounter.recordMisses(1);
+ return null;
+ }
+
+ NodeDocument result = value.getDocument();
+ if (result != null) {
+ statsCounter.recordLoadSuccess(watch.elapsed(TimeUnit.NANOSECONDS));
+ } else {
+ statsCounter.recordMisses(1);
+ }
+
+ if (invalidateAfterRetrieve) {
+ //The value would be made part of L1 cache so no need to keep it
+ //in backend
+ offHeapCache.invalidate(key);
+ }
+
+ return result;
+ }
+
+ private class PrimaryRemovalListener implements RemovalListener<String, NodeDocument> {
+
+ @Override
+ public void onRemoval(RemovalNotification<String, NodeDocument> n) {
+ //If removed explicitly then we clear from L2
+ if (n.getCause() == RemovalCause.EXPLICIT
+ || n.getCause() == RemovalCause.REPLACED) {
+ offHeapCache.invalidate(n.getKey());
+ }
+
+ //If removed because of size then we move it to
+ //L2
+ if (n.getCause() == RemovalCause.SIZE) {
+ NodeDocument doc = n.getValue();
+ if (doc != NodeDocument.NULL) {
+ offHeapCache.put(n.getKey(), new NodeDocReference(n.getKey(), doc));
+ }
+ }
+ }
+ }
+
+ private class SecondaryRemovalListener implements RemovalListener<String, NodeDocReference> {
+ @Override
+ public void onRemoval(RemovalNotification<String, NodeDocReference> notification) {
+ NodeDocReference doc = notification.getValue();
+ if (doc != null && doc.getPointer() != null) {
+ memoryManager.free(doc.getPointer());
+ }
+ }
+ }
+
+ private class NodeDocReference implements CachedNodeDocument, CacheValue {
+ private final Number modCount;
+ private final long created;
+ private final AtomicLong lastCheckTime;
+ private final Pointer<NodeDocument> documentPointer;
+ private final String key;
+
+ public NodeDocReference(String key, NodeDocument doc) {
+ this.modCount = doc.getModCount();
+ this.created = doc.getCreated();
+ this.lastCheckTime = new AtomicLong(doc.getLastCheckTime());
+ this.documentPointer = serialize(doc);
+ this.key = key;
+ }
+
+ @Override
+ public Number getModCount() {
+ return modCount;
+ }
+
+ @Override
+ public long getCreated() {
+ return created;
+ }
+
+ @Override
+ public long getLastCheckTime() {
+ return lastCheckTime.get();
+ }
+
+ @Override
+ public void markUpToDate(long checkTime) {
+ lastCheckTime.set(checkTime);
+ }
+
+ @Override
+ public boolean isUpToDate(long lastCheckTime) {
+ return lastCheckTime <= this.lastCheckTime.get();
+ }
+
+ @CheckForNull
+ public NodeDocument getDocument() {
+ return deserialize(documentPointer);
+ }
+
+ @CheckForNull
+ public Pointer<NodeDocument> getPointer() {
+ return documentPointer;
+ }
+
+ @CheckForNull
+ private Pointer<NodeDocument> serialize(NodeDocument doc) {
+ try {
+ byte[] payload = serializer.serialize(doc);
+ Pointer<NodeDocument> ptr = memoryManager.store(payload, 0);
+ ptr.setClazz(NodeDocument.class);
+ return ptr;
+ } catch (IOException e) {
+ log.warn("Not able to serialize doc {}", doc.getId(), e);
+ return null;
+ }
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ @CheckForNull
+ private NodeDocument deserialize(@CheckForNull Pointer<NodeDocument> pointer) {
+ try {
+ //If there was some error in serializing then pointer
+ // would be null
+ if (pointer == null) {
+ return null;
+ }
+
+ //TODO Look for a way to have a direct access to MemoryManager buffer
+ //for Kryo so that no copying is involved
+
+ final byte[] value;
+
+ //Workaround for DIRECTMEMORY-137 Concurrent access via same pointer
+ //can lead to issues. For now synchronizing on the pointer
+ synchronized (pointer) {
+ value = memoryManager.retrieve(pointer);
+ }
+
+ NodeDocument doc = serializer.deserialize(value, pointer.getClazz());
+ doc.markUpToDate(getLastCheckTime());
+ return doc;
+ } catch (Exception e) {
+ log.warn("Not able to deserialize doc {} with pointer {}", new Object[]{key, pointer, e});
+ }
+ return null;
+ }
+
+
+ @Override
+ public int getMemory() {
+ int result = 168;
+
+ if (documentPointer != null) {
+ result += (int) documentPointer.getSize();
+ }
+ return result;
+ }
+ }
+
+ private static class OakKryoPool extends KryoSerializer.KryoPool {
+ private final DocumentStore documentStore;
+
+ public OakKryoPool(DocumentStore documentStore) {
+ this.documentStore = documentStore;
+ }
+
+ @Override
+ protected Kryo createInstance() {
+ return KryoFactory.createInstance(documentStore);
+ }
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.cache.Cache;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.plugins.mongomk.CachedNodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+
+/**
+ * An OffHeap cache manages the cache value in an off heap storage.
+ *
+ * This interface is required to avoid direct dependency on DirectMemory
+ * and Kryo classes
+ */
+public interface OffHeapCache extends Cache<String, NodeDocument> {
+
+ Map<String, ? extends CachedNodeDocument> offHeapEntriesMap();
+
+ CacheStats getCacheStats();
+
+ @Nullable
+ CachedNodeDocument getCachedDocument(String id);
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Registration;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.MapSerializer;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.Revision;
+import org.apache.jackrabbit.oak.plugins.mongomk.StableRevisionComparator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class Serializers {
+ /**
+ * The serialization and deserialization logic would need to maintain the order
+ * of read and writes
+ */
+
+ public static class RevisionSerizlizer extends Serializer<Revision> {
+ @Override
+ public void write(Kryo kryo, Output o, Revision r) {
+ o.writeLong(r.getTimestamp(), true);
+ o.writeInt(r.getCounter(), true);
+ o.writeInt(r.getClusterId(), true);
+ o.writeBoolean(r.isBranch());
+ }
+
+ @Override
+ public Revision read(Kryo kryo, Input i, Class<Revision> revisionClass) {
+ return new Revision(
+ i.readLong(true), //timestamp
+ i.readInt(true), //counter
+ i.readInt(true), //clusterId
+ i.readBoolean() //branch
+ );
+ }
+ }
+
+ public static class NodeDocumentSerializer extends Serializer<NodeDocument> {
+ private final DocumentStore documentStore;
+
+ public NodeDocumentSerializer(DocumentStore documentStore) {
+ this.documentStore = documentStore;
+ }
+
+ @Override
+ public void write(Kryo kryo, Output o, NodeDocument doc) {
+ checkArgument(doc.isSealed(), "Cannot serialized non seal document [%s]", doc.getId());
+ o.writeLong(doc.getCreated(), true);
+
+ Set<String> keys = doc.keySet();
+ o.writeInt(keys.size(), true);
+
+ //Here assumption is that data has contents of following type
+ //Primitive wrapper
+ //NavigableMap of Revision -> Value
+ for (String key : doc.keySet()) {
+ o.writeString(key);
+ Object val = doc.get(key);
+ if (val instanceof NavigableMap) {
+ kryo.writeClass(o, NavigableMap.class);
+ new RevisionedMapSerializer(kryo).write(kryo, o, (Map) val);
+ } else {
+ kryo.writeClass(o, val.getClass());
+ kryo.writeObject(o, val);
+ }
+ }
+ }
+
+ @Override
+ public NodeDocument read(Kryo kryo, Input input, Class<NodeDocument> nodeDocumentClass) {
+ long created = input.readLong(true);
+
+ int mapSize = input.readInt(true);
+ NodeDocument doc = new NodeDocument(documentStore, created);
+ for (int i = 0; i < mapSize; i++) {
+ String key = input.readString();
+ Registration reg = kryo.readClass(input);
+ Object value;
+ if (reg.getType() == NavigableMap.class) {
+ value = new RevisionedMapSerializer(kryo).read(kryo, input, Map.class);
+ } else {
+ value = kryo.readObject(input, reg.getType());
+ }
+ doc.put(key, value);
+ }
+
+ //Seal the doc once all changes done
+ doc.seal();
+
+ return doc;
+ }
+
+ }
+
+ private static class RevisionedMapSerializer extends MapSerializer {
+
+ public RevisionedMapSerializer(Kryo kryo) {
+ setKeysCanBeNull(false);
+ setKeyClass(Revision.class, kryo.getSerializer(Revision.class));
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Map create(Kryo kryo, Input input, Class<Map> type) {
+ return new TreeMap(StableRevisionComparator.REVERSE);
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java Thu Jan 23 12:35:15 2014
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.plugins.mongomk;
+import com.google.common.collect.Iterables;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.mongomk.util.MongoConnection;
@@ -66,7 +67,7 @@ public class CacheInvalidationIT extends
createTree(root,paths);
c1.merge(root, EmptyHook.INSTANCE, null);
- assertEquals(totalPaths,ds(c1).getCache().size());
+ assertEquals(totalPaths, Iterables.size(ds(c1).getCacheEntries()));
runBgOps(c1,c2);
return totalPaths;
@@ -89,7 +90,7 @@ public class CacheInvalidationIT extends
//Only 2 entries /a and /a/d would be invalidated
// '/' would have been added to cache in start of backgroundRead
//itself
- assertEquals(totalPaths - 2,ds(c1).getCache().size());
+ assertEquals(totalPaths - 2,Iterables.size(ds(c1).getCacheEntries()));
}
@Test
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Ordering;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.Revision;
+import org.apache.jackrabbit.oak.plugins.mongomk.StableRevisionComparator;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SerializerTest {
+ private DocumentStore store = new MemoryDocumentStore();
+
+ @Test
+ public void revisionSerialization() {
+ Revision r = new Revision(System.currentTimeMillis(), 1, 5);
+ assertEquals(r, deserialize(r));
+
+ r = new Revision(System.currentTimeMillis(), 1, 5, true);
+ assertEquals(r, deserialize(r));
+ }
+
+ @Test
+ public void nodeDocSerialization() {
+ long time = System.currentTimeMillis();
+ NodeDocument doc = new NodeDocument(store,time);
+ doc.seal();
+ checkSame(doc, (NodeDocument) deserialize(doc));
+
+ doc = new NodeDocument(store,time);
+ doc.put("_id","b1");
+ doc.put("a2","b2");
+ doc.seal();
+ checkSame(doc, (NodeDocument) deserialize(doc));
+
+ doc = new NodeDocument(store,time);
+ doc.put("_id","b1");
+ doc.put("a2",createRevisionMap());
+ doc.put("a3",createRevisionMap());
+ doc.seal();
+
+ NodeDocument deserDoc = (NodeDocument) deserialize(doc);
+ checkSame(doc, deserDoc);
+
+ //Assert that revision keys are sorted
+ NavigableMap<Revision,Object> values = (NavigableMap<Revision, Object>) deserDoc.get("a2");
+ assertTrue(Ordering.from(StableRevisionComparator.REVERSE).isOrdered(values.keySet()));
+ }
+
+ private Object deserialize(Object data){
+ Kryo k = KryoFactory.createInstance(store);
+ Output o = new Output(1024*1024);
+ k.writeObject(o,data);
+ o.close();
+
+ Input input = new Input(o.getBuffer(), 0, o.position());
+ Object result = k.readObject(input,data.getClass());
+ input.close();
+ System.out.printf("Size %d %s %n",o.position(), data);
+ return result;
+ }
+
+ private static Map<Revision,Object> createRevisionMap(){
+ Map<Revision,Object> map = new TreeMap<Revision, Object>(StableRevisionComparator.REVERSE);
+ for(int i = 0; i < 10; i++){
+ map.put(new Revision(System.currentTimeMillis() + i, 0, 2),"foo"+i);
+ }
+ return map;
+ }
+
+ private static void checkSame(NodeDocument d1, NodeDocument d2){
+ assertEquals(d1.getCreated(), d2.getCreated());
+ assertEquals(d1.keySet(), d2.keySet());
+ for(String key : d1.keySet()){
+ assertEquals(d1.get(key), d2.get(key));
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java
------------------------------------------------------------------------------
svn:eol-style = native