You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/01/23 13:35:16 UTC

svn commit: r1560666 - in /jackrabbit/oak/trunk/oak-core: ./ src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/ src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ src/test/java/...

Author: chetanm
Date: Thu Jan 23 12:35:15 2014
New Revision: 1560666

URL: http://svn.apache.org/r1560666
Log:
OAK-891 - Use DirectMemory as Level 2/Offheap cache

Adding OffHeap cache support which uses Kryo and DirectMemory
-- OffHeapCache interface to remove direct dependency on DM and Kryo classes
-- Cache invalidation logic takes care of off heap cache content also
-- Care is taken to not deserialize complete NodeDocument just for cache consistency
    check.
-- Custom Kryo serializer for compact storage of NodeDocument and Revision classes
-- Off heap cache is used if off heap cache size is set to non zero value in MongoMK.Builder

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/pom.xml
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java

Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Thu Jan 23 12:35:15 2014
@@ -252,6 +252,20 @@
       <scope>provided</scope>
     </dependency>
 
+    <!-- Required for OffHeap storage and serialization-->
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>2.22</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.directmemory</groupId>
+      <artifactId>directmemory-cache</artifactId>
+      <version>0.2</version>
+      <optional>true</optional>
+    </dependency>
+
     <!-- Test Dependencies -->
     <dependency>
       <groupId>junit</groupId>

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidator.java Thu Jan 23 12:35:15 2014
@@ -19,6 +19,12 @@
 
 package org.apache.jackrabbit.oak.plugins.mongomk;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
@@ -34,11 +40,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
 abstract class CacheInvalidator {
     static final Logger LOG = LoggerFactory.getLogger(CacheInvalidator.class);
 
@@ -87,11 +88,12 @@ abstract class CacheInvalidator {
         @Override
         public InvalidationResult invalidateCache() {
             InvalidationResult result = new InvalidationResult();
-            Map<String, NodeDocument> cacheMap = documentStore.getCache();
-            result.cacheSize = cacheMap.size();
-            for (String key : cacheMap.keySet()) {
-                documentStore.invalidateCache(Collection.NODES, key);
+            int size = 0;
+            for (Map.Entry<String, ? extends CachedNodeDocument> e : documentStore.getCacheEntries()) {
+                size++;
+                documentStore.invalidateCache(Collection.NODES, e.getKey());
             }
+            result.cacheSize = size;
             return result;
         }
     }
@@ -107,12 +109,18 @@ abstract class CacheInvalidator {
 
         @Override
         public InvalidationResult invalidateCache() {
-            final Map<String, NodeDocument> cacheMap = documentStore.getCache();
             final InvalidationResult result = new InvalidationResult();
-            result.cacheSize = cacheMap.size();
+
+            int size  = 0;
+            List<String> cachedKeys = new ArrayList<String>();
+            for (Map.Entry<String, ? extends CachedNodeDocument> e : documentStore.getCacheEntries()) {
+                size++;
+                cachedKeys.add(e.getKey());
+            }
+            result.cacheSize = size;
 
             QueryBuilder query = QueryBuilder.start(Document.ID)
-                    .in(cacheMap.keySet());
+                    .in(cachedKeys);
 
             // Fetch only the lastRev map and id
             final BasicDBObject keys = new BasicDBObject(Document.ID, 1);
@@ -121,12 +129,13 @@ abstract class CacheInvalidator {
             // Fetch lastRev for each such node
             DBCursor cursor = nodes.find(query.get(), keys);
             result.queryCount++;
+
             for (DBObject obj : cursor) {
                 result.cacheEntriesProcessedCount++;
                 String id = (String) obj.get(Document.ID);
                 Number modCount = (Number) obj.get(Document.MOD_COUNT);
 
-                NodeDocument cachedDoc = documentStore.getIfCached(Collection.NODES, id);
+                CachedNodeDocument cachedDoc = documentStore.getCachedNodeDoc(id);
                 if (cachedDoc != null
                         && !Objects.equal(cachedDoc.getModCount(), modCount)) {
                     documentStore.invalidateCache(Collection.NODES, id);
@@ -141,7 +150,7 @@ abstract class CacheInvalidator {
 
 
     private static class HierarchicalInvalidator extends CacheInvalidator {
-        
+
         private static final TreeTraverser<TreeNode> TRAVERSER = new TreeTraverser<TreeNode>() {
             @Override
             public Iterable<TreeNode> children(TreeNode root) {
@@ -160,11 +169,7 @@ abstract class CacheInvalidator {
         @Override
         public InvalidationResult invalidateCache() {
             final InvalidationResult result = new InvalidationResult();
-            Map<String, NodeDocument> cacheMap = documentStore.getCache();
-            TreeNode root = constructTreeFromPaths(cacheMap.keySet());
-
-            // Invalidation stats
-            result.cacheSize = cacheMap.size();
+            TreeNode root = constructTreeFromPaths(documentStore.getCacheEntries(), result);
 
             // Time at which the check is started. All NodeDocuments which
             // are found to be up-to-date would be marked touched at this time
@@ -219,7 +224,7 @@ abstract class CacheInvalidator {
                         String id = (String) obj.get(Document.ID);
 
                         final TreeNode tn2 = sameLevelNodes.get(id);
-                        NodeDocument cachedDoc = tn2.getDocument();
+                        CachedNodeDocument cachedDoc = tn2.getDocument();
                         if (cachedDoc != null) {
                             boolean noChangeInModCount = Objects.equal(latestModCount, cachedDoc.getModCount());
                             if (noChangeInModCount) {
@@ -256,11 +261,19 @@ abstract class CacheInvalidator {
             return result;
         }
 
-        private TreeNode constructTreeFromPaths(Set<String> ids) {
+        private TreeNode constructTreeFromPaths(Iterable<? extends Map.Entry<String, ? extends CachedNodeDocument>> entries,
+                                                InvalidationResult result) {
             TreeNode root = new TreeNode("");
-            for (String id : ids) {
+            for (Map.Entry<String, ? extends CachedNodeDocument> e : entries) {
                 TreeNode current = root;
-                String path = Utils.getPathFromId(id);
+
+                //TODO Split documents are immutable hence no need to
+                //check them
+                //TODO Need to determine way to determine if the
+                //key is referring to a split document
+
+                String path = Utils.getPathFromId(e.getKey());
+                result.cacheSize++;
                 for (String name : PathUtils.elements(path)) {
                     current = current.child(name);
                 }
@@ -329,12 +342,12 @@ abstract class CacheInvalidator {
                 documentStore.invalidateCache(Collection.NODES, getId());
             }
 
-            public NodeDocument getDocument() {
-                return documentStore.getIfCached(Collection.NODES, id);
+            public CachedNodeDocument getDocument() {
+                return documentStore.getCachedNodeDoc(id);
             }
 
             public boolean isUptodate(long time) {
-                NodeDocument doc = documentStore.getIfCached(Collection.NODES, id);
+                CachedNodeDocument doc = documentStore.getCachedNodeDoc(id);
                 if (doc != null) {
                     return doc.isUpToDate(time);
                 } else {
@@ -346,7 +359,7 @@ abstract class CacheInvalidator {
             }
 
             public void markUptodate(long cacheCheckTime) {
-                NodeDocument doc = getDocument();
+                CachedNodeDocument doc = getDocument();
                 if (doc == null) {
                     return;
                 }
@@ -358,7 +371,7 @@ abstract class CacheInvalidator {
                 return id;
             }
 
-            private void markUptodate(long cacheCheckTime, NodeDocument upToDateRoot) {
+            private void markUptodate(long cacheCheckTime, CachedNodeDocument upToDateRoot) {
                 for (TreeNode tn : children.values()) {
                     tn.markUptodate(cacheCheckTime, upToDateRoot);
                 }
@@ -366,8 +379,8 @@ abstract class CacheInvalidator {
                 markUptodate(getId(), cacheCheckTime, upToDateRoot);
             }
 
-            private void markUptodate(String key, long time, NodeDocument upToDateRoot) {
-                NodeDocument doc = documentStore.getIfCached(Collection.NODES, key);
+            private void markUptodate(String key, long time, CachedNodeDocument upToDateRoot) {
+                CachedNodeDocument doc = documentStore.getCachedNodeDoc(key);
 
                 if (doc == null) {
                     return;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoDocumentStore.java Thu Jan 23 12:35:15 2014
@@ -16,6 +16,8 @@
  */
 package org.apache.jackrabbit.oak.plugins.mongomk;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,16 +34,21 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import com.google.common.base.Splitter;
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.plugins.mongomk.UpdateOp.Key;
 import org.apache.jackrabbit.oak.plugins.mongomk.UpdateOp.Operation;
+import org.apache.jackrabbit.oak.plugins.mongomk.cache.ForwardingListener;
+import org.apache.jackrabbit.oak.plugins.mongomk.cache.NodeDocOffHeapCache;
+import org.apache.jackrabbit.oak.plugins.mongomk.cache.OffHeapCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
-import com.google.common.base.Splitter;
 import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Striped;
 import com.mongodb.BasicDBObject;
@@ -106,10 +113,30 @@ public class MongoDocumentStore implemen
         nodes.ensureIndex(index, options);
 
         // TODO expire entries if the parent was changed
-        nodesCache = builder.buildCache(builder.getDocumentCacheSize());
+        if(builder.useOffHeapCache()){
+            nodesCache = createOffHeapCache(builder);
+        }else{
+            nodesCache = builder.buildCache(builder.getDocumentCacheSize());
+        }
+
         cacheStats = new CacheStats(nodesCache, "MongoMk-Documents", builder.getWeigher(),
                 builder.getDocumentCacheSize());
     }
+
+    private Cache<String , NodeDocument> createOffHeapCache(MongoMK.Builder builder){
+        ForwardingListener<String , NodeDocument> listener = ForwardingListener.newInstance();
+
+        Cache<String,NodeDocument> primaryCache = CacheBuilder.newBuilder()
+                .weigher(builder.getWeigher())
+                .maximumWeight(builder.getDocumentCacheSize())
+                .removalListener(listener)
+                .recordStats()
+                .build();
+
+        Cache<String,NodeDocument> cache =
+                new NodeDocOffHeapCache( primaryCache, listener, builder, this );
+        return cache;
+    }
     
     private static long start() {
         return LOG_TIME ? System.currentTimeMillis() : 0;
@@ -568,14 +595,35 @@ public class MongoDocumentStore implemen
             LOG.debug("MongoDB time: " + timeSum);
         }
         nodes.getDB().getMongo().close();
+
+        if(nodesCache instanceof Closeable){
+            try {
+                ((Closeable)nodesCache).close();
+            } catch (IOException e) {
+
+                LOG.warn("Error occurred while closing Off Heap Cache",e);
+            }
+        }
     }
 
     public CacheStats getCacheStats() {
         return cacheStats;
     }
 
-    Map<String, NodeDocument> getCache() {
-        return Collections.unmodifiableMap(nodesCache.asMap());
+    Iterable<? extends Map.Entry<String, ? extends CachedNodeDocument>> getCacheEntries() {
+        if(nodesCache instanceof OffHeapCache){
+            return Iterables.concat(nodesCache.asMap().entrySet(),
+                    ((OffHeapCache)nodesCache).offHeapEntriesMap().entrySet());
+        }
+        return nodesCache.asMap().entrySet();
+    }
+
+    CachedNodeDocument getCachedNodeDoc(String id){
+        if(nodesCache instanceof OffHeapCache){
+            return  ((OffHeapCache) nodesCache).getCachedDocument(id);
+        }
+
+        return nodesCache.getIfPresent(id);
     }
 
     private static void log(String message, Object... args) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Thu Jan 23 12:35:15 2014
@@ -661,6 +661,7 @@ public class MongoMK implements MicroKer
         private long docChildrenCacheSize;
         private boolean useSimpleRevision;
         private long splitDocumentAgeMillis = 5 * 60 * 1000;
+        private long offHeapCacheSize = -1;
 
         public Builder() {
             memoryCacheSize(DEFAULT_MEMORY_CACHE_SIZE);
@@ -846,6 +847,19 @@ public class MongoMK implements MicroKer
             return splitDocumentAgeMillis;
         }
 
+        public boolean useOffHeapCache() {
+            return this.offHeapCacheSize > 0;
+        }
+
+        public long getOffHeapCacheSize() {
+            return offHeapCacheSize;
+        }
+
+        public Builder offHeapCacheSize(long offHeapCacheSize) {
+            this.offHeapCacheSize = offHeapCacheSize;
+            return this;
+        }
+
         /**
          * Open the MongoMK instance using the configured options.
          * 
@@ -855,13 +869,6 @@ public class MongoMK implements MicroKer
             return new MongoMK(this);
         }
         
-        /**
-         * Create a cache.
-         * 
-         * @param <V> the value type
-         * @param maxWeight
-         * @return the cache
-         */
         public <V extends CacheValue> Cache<String, V> buildCache(long maxWeight) {
             if (LIRS_CACHE) {
                 return CacheLIRS.newBuilder().weigher(weigher).

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+/**
+ * Listener which forwards the notifications to a delegate. It is used to bridge
+ * multiple instances.
+ *
+ */
+public class ForwardingListener<K, V>
+        implements RemovalListener<K, V> {
+    private RemovalListener<K, V> delegate;
+
+    public ForwardingListener() {
+    }
+
+    public ForwardingListener(RemovalListener<K, V> delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void onRemoval(RemovalNotification<K, V> notification) {
+        if (delegate != null) {
+            delegate.onRemoval(notification);
+        }
+    }
+
+    public void setDelegate(RemovalListener<K, V> delegate) {
+        this.delegate = delegate;
+    }
+
+    public static <K, V> ForwardingListener<K, V> newInstance() {
+        return new ForwardingListener<K, V>();
+    }
+
+    public static <K, V> ForwardingListener<K, V> newInstance(RemovalListener<K, V> delegate) {
+        return new ForwardingListener<K, V>(delegate);
+    }
+}
\ No newline at end of file

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/ForwardingListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.util.NavigableMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.Revision;
+
+/**
+ * Factory to create Kryo instances customized for managing NodeDocument
+ */
+public class KryoFactory {
+
+    public static Kryo createInstance(DocumentStore documentStore){
+        Kryo kryo = new Kryo();
+        kryo.setReferences(false);
+        kryo.register(Revision.class, new Serializers.RevisionSerizlizer());
+        kryo.register(NodeDocument.class, new Serializers.NodeDocumentSerializer(documentStore));
+        kryo.register(NavigableMap.class);
+
+        //All the required classes need to be registered explicitly
+        kryo.setRegistrationRequired(true);
+        return kryo;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class KryoSerializer implements Closeable {
+    /* buffer size */
+    private static final int BUFFER_SIZE = 1024;
+
+    private final KryoPool pool;
+
+    public KryoSerializer(KryoPool pool) {
+        this.pool = pool;
+    }
+
+    public <T> byte[] serialize(T obj)
+            throws IOException {
+        KryoHolder kh = null;
+        try {
+            kh = pool.get();
+            kh.reset();
+            kh.kryo.writeObject(kh.output, obj);
+            return kh.output.toBytes();
+        } finally {
+            if (kh != null) {
+                pool.done(kh);
+            }
+        }
+    }
+
+
+    public <T> T deserialize(byte[] source, Class<T> clazz)
+            throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
+        KryoHolder kh = null;
+        try {
+            kh = pool.get();
+            Input input = new Input(source);
+            return kh.kryo.readObject(input, clazz);
+        } finally {
+            if (kh != null) {
+                pool.done(kh);
+            }
+        }
+    }
+
+    /**
+     * Closes the pool releasing any associated Kryo instance with it
+     */
+    @Override
+    public void close() throws IOException {
+        pool.close();
+    }
+
+    private static class KryoHolder {
+        final Kryo kryo;
+        final Output output = new Output(BUFFER_SIZE, -1);
+
+        KryoHolder(Kryo kryo) {
+            this.kryo = kryo;
+        }
+
+        private void reset() {
+            output.clear();
+        }
+    }
+
+    public static class KryoPool {
+        private final Queue<KryoHolder> objects = new ConcurrentLinkedQueue<KryoHolder>();
+
+        public KryoHolder get() {
+            KryoHolder kh;
+            if ((kh = objects.poll()) == null) {
+                kh = new KryoHolder(createInstance());
+            }
+            return kh;
+        }
+
+        public void done(KryoHolder kh) {
+            objects.offer(kh);
+        }
+
+        public void close() {
+            objects.clear();
+        }
+
+        /**
+         * Sub classes can customize the Kryo instance by overriding this method
+         *
+         * @return created Kryo instance
+         */
+        protected Kryo createInstance() {
+            Kryo kryo = new Kryo();
+            kryo.setReferences(false);
+            return kryo;
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/KryoSerializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.base.Stopwatch;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.ForwardingCache;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.directmemory.measures.Ram;
+import org.apache.directmemory.memory.MemoryManagerService;
+import org.apache.directmemory.memory.MemoryManagerServiceImpl;
+import org.apache.directmemory.memory.Pointer;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.cache.CacheValue;
+import org.apache.jackrabbit.oak.plugins.mongomk.CachedNodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.MongoMK;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.cache.AbstractCache.SimpleStatsCounter;
+import static com.google.common.cache.AbstractCache.StatsCounter;
+
+public class NodeDocOffHeapCache
+        extends ForwardingCache.SimpleForwardingCache<String, NodeDocument>
+        implements Closeable, OffHeapCache {
+    private final StatsCounter statsCounter = new SimpleStatsCounter();
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final Cache<String, NodeDocReference> offHeapCache;
+    private final CacheStats offHeapCacheStats;
+
+    private final MemoryManagerService<NodeDocument> memoryManager;
+
+    private final KryoSerializer serializer;
+
+    public NodeDocOffHeapCache(Cache<String, NodeDocument> delegate,
+                               ForwardingListener<String, NodeDocument> forwardingListener,
+                               MongoMK.Builder builder,
+                               DocumentStore documentStore) {
+        super(delegate);
+        forwardingListener.setDelegate(new PrimaryRemovalListener());
+
+        final long maxMemory = builder.getOffHeapCacheSize();
+
+        //TODO We may also expire the entries from cache if not accessed for some time
+        offHeapCache = CacheBuilder.newBuilder()
+                .weigher(builder.getWeigher())
+                .maximumWeight(maxMemory)
+                .removalListener(new SecondaryRemovalListener())
+                .recordStats()
+                .build();
+
+        offHeapCacheStats = new CacheStats(offHeapCache, "MongoMk-Documents-L2", builder.getWeigher(),
+                builder.getOffHeapCacheSize());
+
+        final long bufferSize = Ram.Gb(1);
+        int noOfBuffers = Math.max(1, (int) (maxMemory / bufferSize));
+        int buffSize = (int) Math.min(maxMemory, bufferSize);
+
+        //TODO Check if UnsafeMemoryManagerServiceImpl should be preferred
+        //on Sun/Oracle JDK
+        memoryManager = new MemoryManagerServiceImpl<NodeDocument>();
+        memoryManager.init(noOfBuffers, buffSize);
+
+        serializer = new KryoSerializer(new OakKryoPool(documentStore));
+    }
+
+    @Override
+    public NodeDocument getIfPresent(Object key) {
+        NodeDocument result = super.getIfPresent(key);
+        if (result == null) {
+            result = retrieve(key, false);
+        }
+        return result;
+    }
+
+
+    @Override
+    public NodeDocument get(final String key, final Callable<? extends NodeDocument> valueLoader)
+            throws ExecutionException {
+        return super.get(key, new Callable<NodeDocument>() {
+            @Override
+            public NodeDocument call()
+                    throws Exception {
+                //Check in offHeap first
+                NodeDocument result = retrieve(key, true);
+
+                //Not found in L2 then load
+                if (result == null) {
+                    result = valueLoader.call();
+                }
+                return result;
+            }
+        });
+    }
+
+    @Override
+    public ImmutableMap<String, NodeDocument> getAllPresent(Iterable<?> keys) {
+        @SuppressWarnings("unchecked") List<String> list = Lists.newArrayList((Iterable<String>) keys);
+        ImmutableMap<String, NodeDocument> result = super.getAllPresent(list);
+
+        //All the requested keys found then no
+        //need to check L2
+        if (result.size() == list.size()) {
+            return result;
+        }
+
+        //Look up value from L2
+        Map<String, NodeDocument> r2 = Maps.newHashMap(result);
+        for (String key : list) {
+            if (!result.containsKey(key)) {
+                NodeDocument val = retrieve(key, false);
+                if (val != null) {
+                    r2.put(key, val);
+                }
+            }
+        }
+        return ImmutableMap.copyOf(r2);
+    }
+
+    @Override
+    public void invalidate(Object key) {
+        super.invalidate(key);
+        offHeapCache.invalidate(key);
+    }
+
+    @Override
+    public void invalidateAll(Iterable<?> keys) {
+        super.invalidateAll(keys);
+        offHeapCache.invalidateAll(keys);
+    }
+
+    @Override
+    public void invalidateAll() {
+        super.invalidateAll();
+        offHeapCache.invalidateAll();
+    }
+
+    @Override
+    public void close() throws IOException {
+        memoryManager.close();
+        serializer.close();
+    }
+
+    @Override
+    public Map<String, ? extends CachedNodeDocument> offHeapEntriesMap() {
+        return Collections.unmodifiableMap(offHeapCache.asMap());
+    }
+
+    @Override
+    public CacheStats getCacheStats() {
+        return offHeapCacheStats;
+    }
+
+    @Nullable
+    @Override
+    public CachedNodeDocument getCachedDocument(String id) {
+        NodeDocument doc = super.getIfPresent(id);
+        if (doc != null) {
+            return doc;
+        }
+        return offHeapCache.getIfPresent(id);
+    }
+
+    /**
+     * Retrieves the value from the off heap cache.
+     *
+     * @param key                     cache entry key to retrieve
+     * @param invalidateAfterRetrieve set it to true if the entry from off heap cache has
+     *                                to be invalidated. This would be the case when value loaded is
+     *                                made part of L1 cache
+     */
+    private NodeDocument retrieve(Object key, boolean invalidateAfterRetrieve) {
+        Stopwatch watch = Stopwatch.createStarted();
+
+        NodeDocReference value = offHeapCache.getIfPresent(key);
+        if (value == null) {
+            statsCounter.recordMisses(1);
+            return null;
+        }
+
+        NodeDocument result = value.getDocument();
+        if (result != null) {
+            statsCounter.recordLoadSuccess(watch.elapsed(TimeUnit.NANOSECONDS));
+        } else {
+            statsCounter.recordMisses(1);
+        }
+
+        if (invalidateAfterRetrieve) {
+            //The value would be made part of L1 cache so no need to keep it
+            //in backend
+            offHeapCache.invalidate(key);
+        }
+
+        return result;
+    }
+
+    private class PrimaryRemovalListener implements RemovalListener<String, NodeDocument> {
+
+        @Override
+        public void onRemoval(RemovalNotification<String, NodeDocument> n) {
+            //If removed explicitly then we clear from L2
+            if (n.getCause() == RemovalCause.EXPLICIT
+                    || n.getCause() == RemovalCause.REPLACED) {
+                offHeapCache.invalidate(n.getKey());
+            }
+
+            //If removed because of size then we move it to
+            //L2
+            if (n.getCause() == RemovalCause.SIZE) {
+                NodeDocument doc = n.getValue();
+                if (doc != NodeDocument.NULL) {
+                    offHeapCache.put(n.getKey(), new NodeDocReference(n.getKey(), doc));
+                }
+            }
+        }
+    }
+
+    private class SecondaryRemovalListener implements RemovalListener<String, NodeDocReference> {
+        @Override
+        public void onRemoval(RemovalNotification<String, NodeDocReference> notification) {
+            NodeDocReference doc = notification.getValue();
+            if (doc != null && doc.getPointer() != null) {
+                memoryManager.free(doc.getPointer());
+            }
+        }
+    }
+
+    private class NodeDocReference implements CachedNodeDocument, CacheValue {
+        private final Number modCount;
+        private final long created;
+        private final AtomicLong lastCheckTime;
+        private final Pointer<NodeDocument> documentPointer;
+        private final String key;
+
+        public NodeDocReference(String key, NodeDocument doc) {
+            this.modCount = doc.getModCount();
+            this.created = doc.getCreated();
+            this.lastCheckTime = new AtomicLong(doc.getLastCheckTime());
+            this.documentPointer = serialize(doc);
+            this.key = key;
+        }
+
+        @Override
+        public Number getModCount() {
+            return modCount;
+        }
+
+        @Override
+        public long getCreated() {
+            return created;
+        }
+
+        @Override
+        public long getLastCheckTime() {
+            return lastCheckTime.get();
+        }
+
+        @Override
+        public void markUpToDate(long checkTime) {
+            lastCheckTime.set(checkTime);
+        }
+
+        @Override
+        public boolean isUpToDate(long lastCheckTime) {
+            return lastCheckTime <= this.lastCheckTime.get();
+        }
+
+        @CheckForNull
+        public NodeDocument getDocument() {
+            return deserialize(documentPointer);
+        }
+
+        @CheckForNull
+        public Pointer<NodeDocument> getPointer() {
+            return documentPointer;
+        }
+
+        @CheckForNull
+        private Pointer<NodeDocument> serialize(NodeDocument doc) {
+            try {
+                byte[] payload = serializer.serialize(doc);
+                Pointer<NodeDocument> ptr = memoryManager.store(payload, 0);
+                ptr.setClazz(NodeDocument.class);
+                return ptr;
+            } catch (IOException e) {
+                log.warn("Not able to serialize doc {}", doc.getId(), e);
+                return null;
+            }
+        }
+
+        @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+        @CheckForNull
+        private NodeDocument deserialize(@CheckForNull Pointer<NodeDocument> pointer) {
+            try {
+                //If there was some error in serializing then pointer
+                // would be null
+                if (pointer == null) {
+                    return null;
+                }
+
+                //TODO Look for a way to have a direct access to MemoryManager buffer
+                //for Kryo so that no copying is involved
+
+                final byte[] value;
+
+                //Workaround for DIRECTMEMORY-137 Concurrent access via same pointer
+                //can lead to issues. For now synchronizing on the pointer
+                synchronized (pointer) {
+                    value = memoryManager.retrieve(pointer);
+                }
+
+                NodeDocument doc = serializer.deserialize(value, pointer.getClazz());
+                doc.markUpToDate(getLastCheckTime());
+                return doc;
+            } catch (Exception e) {
+                log.warn("Not able to deserialize doc {} with pointer {}", new Object[]{key, pointer, e});
+            }
+            return null;
+        }
+
+
+        @Override
+        public int getMemory() {
+            int result = 168;
+
+            if (documentPointer != null) {
+                result += (int) documentPointer.getSize();
+            }
+            return result;
+        }
+    }
+
+    private static class OakKryoPool extends KryoSerializer.KryoPool {
+        private final DocumentStore documentStore;
+
+        public OakKryoPool(DocumentStore documentStore) {
+            this.documentStore = documentStore;
+        }
+
+        @Override
+        protected Kryo createInstance() {
+            return KryoFactory.createInstance(documentStore);
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/NodeDocOffHeapCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.cache.Cache;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.plugins.mongomk.CachedNodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+
+/**
+ * An OffHeap cache manages the cache value in an off heap storage.
+ *
+ * This interface is required to avoid direct dependency on DirectMemory
+ * and Kryo classes
+ */
+public interface OffHeapCache extends Cache<String, NodeDocument> {
+
+    Map<String, ? extends CachedNodeDocument> offHeapEntriesMap();
+
+    CacheStats getCacheStats();
+
+    @Nullable
+    CachedNodeDocument getCachedDocument(String id);
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/OffHeapCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Registration;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.MapSerializer;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.Revision;
+import org.apache.jackrabbit.oak.plugins.mongomk.StableRevisionComparator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class Serializers {
+    /**
+     * The serialization and deserialization logic would need to maintain the order
+     * of read and writes
+     */
+
+    public static class RevisionSerizlizer extends Serializer<Revision> {
+        @Override
+        public void write(Kryo kryo, Output o, Revision r) {
+            o.writeLong(r.getTimestamp(), true);
+            o.writeInt(r.getCounter(), true);
+            o.writeInt(r.getClusterId(), true);
+            o.writeBoolean(r.isBranch());
+        }
+
+        @Override
+        public Revision read(Kryo kryo, Input i, Class<Revision> revisionClass) {
+            return new Revision(
+                    i.readLong(true), //timestamp
+                    i.readInt(true),  //counter
+                    i.readInt(true),  //clusterId
+                    i.readBoolean() //branch
+            );
+        }
+    }
+
+    public static class NodeDocumentSerializer extends Serializer<NodeDocument> {
+        private final DocumentStore documentStore;
+
+        public NodeDocumentSerializer(DocumentStore documentStore) {
+            this.documentStore = documentStore;
+        }
+
+        @Override
+        public void write(Kryo kryo, Output o, NodeDocument doc) {
+            checkArgument(doc.isSealed(), "Cannot serialized non seal document [%s]", doc.getId());
+            o.writeLong(doc.getCreated(), true);
+
+            Set<String> keys = doc.keySet();
+            o.writeInt(keys.size(), true);
+
+            //Here assumption is that data has contents of following type
+            //Primitive wrapper
+            //NavigableMap of Revision -> Value
+            for (String key : doc.keySet()) {
+                o.writeString(key);
+                Object val = doc.get(key);
+                if (val instanceof NavigableMap) {
+                    kryo.writeClass(o, NavigableMap.class);
+                    new RevisionedMapSerializer(kryo).write(kryo, o, (Map) val);
+                } else {
+                    kryo.writeClass(o, val.getClass());
+                    kryo.writeObject(o, val);
+                }
+            }
+        }
+
+        @Override
+        public NodeDocument read(Kryo kryo, Input input, Class<NodeDocument> nodeDocumentClass) {
+            long created = input.readLong(true);
+
+            int mapSize = input.readInt(true);
+            NodeDocument doc = new NodeDocument(documentStore, created);
+            for (int i = 0; i < mapSize; i++) {
+                String key = input.readString();
+                Registration reg = kryo.readClass(input);
+                Object value;
+                if (reg.getType() == NavigableMap.class) {
+                    value = new RevisionedMapSerializer(kryo).read(kryo, input, Map.class);
+                } else {
+                    value = kryo.readObject(input, reg.getType());
+                }
+                doc.put(key, value);
+            }
+
+            //Seal the doc once all changes done
+            doc.seal();
+
+            return doc;
+        }
+
+    }
+
+    private static class RevisionedMapSerializer extends MapSerializer {
+
+        public RevisionedMapSerializer(Kryo kryo) {
+            setKeysCanBeNull(false);
+            setKeyClass(Revision.class, kryo.getSerializer(Revision.class));
+        }
+
+        @SuppressWarnings("unchecked")
+        protected Map create(Kryo kryo, Input input, Class<Map> type) {
+            return new TreeMap(StableRevisionComparator.REVERSE);
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/Serializers.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java?rev=1560666&r1=1560665&r2=1560666&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CacheInvalidationIT.java Thu Jan 23 12:35:15 2014
@@ -19,6 +19,7 @@
 
 package org.apache.jackrabbit.oak.plugins.mongomk;
 
+import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.mongomk.util.MongoConnection;
@@ -66,7 +67,7 @@ public class CacheInvalidationIT extends
         createTree(root,paths);
         c1.merge(root, EmptyHook.INSTANCE, null);
 
-        assertEquals(totalPaths,ds(c1).getCache().size());
+        assertEquals(totalPaths, Iterables.size(ds(c1).getCacheEntries()));
 
         runBgOps(c1,c2);
         return totalPaths;
@@ -89,7 +90,7 @@ public class CacheInvalidationIT extends
         //Only 2 entries /a and /a/d would be invalidated
         // '/' would have been added to cache in start of backgroundRead
         //itself
-        assertEquals(totalPaths - 2,ds(c1).getCache().size());
+        assertEquals(totalPaths - 2,Iterables.size(ds(c1).getCacheEntries()));
     }
 
     @Test

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java?rev=1560666&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java Thu Jan 23 12:35:15 2014
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.mongomk.cache;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Ordering;
+import org.apache.jackrabbit.oak.plugins.mongomk.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.mongomk.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.mongomk.Revision;
+import org.apache.jackrabbit.oak.plugins.mongomk.StableRevisionComparator;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SerializerTest {
+    private DocumentStore store = new MemoryDocumentStore();
+
+    @Test
+    public void revisionSerialization() {
+        Revision r = new Revision(System.currentTimeMillis(), 1, 5);
+        assertEquals(r, deserialize(r));
+
+        r = new Revision(System.currentTimeMillis(), 1, 5, true);
+        assertEquals(r, deserialize(r));
+    }
+
+    @Test
+    public void nodeDocSerialization() {
+        long time = System.currentTimeMillis();
+        NodeDocument doc = new NodeDocument(store,time);
+        doc.seal();
+        checkSame(doc, (NodeDocument) deserialize(doc));
+
+        doc = new NodeDocument(store,time);
+        doc.put("_id","b1");
+        doc.put("a2","b2");
+        doc.seal();
+        checkSame(doc, (NodeDocument) deserialize(doc));
+
+        doc = new NodeDocument(store,time);
+        doc.put("_id","b1");
+        doc.put("a2",createRevisionMap());
+        doc.put("a3",createRevisionMap());
+        doc.seal();
+
+        NodeDocument deserDoc = (NodeDocument) deserialize(doc);
+        checkSame(doc, deserDoc);
+
+        //Assert that revision keys are sorted
+        NavigableMap<Revision,Object> values = (NavigableMap<Revision, Object>) deserDoc.get("a2");
+        assertTrue(Ordering.from(StableRevisionComparator.REVERSE).isOrdered(values.keySet()));
+    }
+
+    private Object deserialize(Object data){
+        Kryo k = KryoFactory.createInstance(store);
+        Output o = new Output(1024*1024);
+        k.writeObject(o,data);
+        o.close();
+
+        Input input = new Input(o.getBuffer(), 0, o.position());
+        Object result = k.readObject(input,data.getClass());
+        input.close();
+        System.out.printf("Size %d %s %n",o.position(), data);
+        return result;
+    }
+
+    private static Map<Revision,Object> createRevisionMap(){
+        Map<Revision,Object> map = new TreeMap<Revision, Object>(StableRevisionComparator.REVERSE);
+        for(int i = 0; i < 10; i++){
+            map.put(new Revision(System.currentTimeMillis() + i, 0, 2),"foo"+i);
+        }
+        return map;
+    }
+
+    private static void checkSame(NodeDocument d1, NodeDocument d2){
+        assertEquals(d1.getCreated(), d2.getCreated());
+        assertEquals(d1.keySet(), d2.keySet());
+        for(String key : d1.keySet()){
+            assertEquals(d1.get(key), d2.get(key));
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/cache/SerializerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native