You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2013/12/21 10:59:23 UTC

git commit: performance improvements for clustered caches

Updated Branches:
  refs/heads/develop 9a7dbd295 -> b454dd730


performance improvements for clustered caches


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/b454dd73
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/b454dd73
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/b454dd73

Branch: refs/heads/develop
Commit: b454dd7308afde48611e616da45e13747bfd5c4a
Parents: 9a7dbd2
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Sat Dec 21 10:59:08 2013 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Sat Dec 21 10:59:08 2013 +0100

----------------------------------------------------------------------
 .../marmotta/kiwi/caching/KiWiCacheManager.java |  12 +-
 .../kiwi/caching/TripleExternalizer.java        | 109 +++++++++++++++++++
 .../kiwi/persistence/KiWiConnection.java        |  26 +++--
 .../kiwi/persistence/KiWiPersistence.java       |   7 +-
 .../registry/CacheTripleRegistry.java           |   8 +-
 5 files changed, 144 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/b454dd73/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java
index 0b250de..ba8bfc0 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java
@@ -19,6 +19,7 @@ package org.apache.marmotta.kiwi.caching;
 
 import org.apache.marmotta.kiwi.config.KiWiConfiguration;
 import org.infinispan.Cache;
+import org.infinispan.commons.marshall.AdvancedExternalizer;
 import org.infinispan.configuration.cache.CacheMode;
 import org.infinispan.configuration.cache.Configuration;
 import org.infinispan.configuration.cache.ConfigurationBuilder;
@@ -72,7 +73,7 @@ public class KiWiCacheManager {
      *
      * @param config
      */
-    public KiWiCacheManager(KiWiConfiguration config) {
+    public KiWiCacheManager(KiWiConfiguration config, AdvancedExternalizer...externalizers) {
 
         this.clustered = config.isClustered();
         this.kiWiConfiguration = config;
@@ -88,9 +89,12 @@ public class KiWiCacheManager {
                     .globalJmxStatistics()
                         .jmxDomain("org.apache.marmotta.kiwi")
                         .allowDuplicateDomains(true)
+                    .serialization()
+                        .addAdvancedExternalizer(externalizers)
                     .build();
 
 
+
             defaultConfiguration = new ConfigurationBuilder()
                     .clustering()
                         .cacheMode(CacheMode.DIST_ASYNC)
@@ -144,7 +148,7 @@ public class KiWiCacheManager {
      * @param cacheManager
      * @param kiWiConfiguration
      */
-    public KiWiCacheManager(EmbeddedCacheManager cacheManager, KiWiConfiguration kiWiConfiguration) {
+    public KiWiCacheManager(EmbeddedCacheManager cacheManager, KiWiConfiguration kiWiConfiguration, AdvancedExternalizer...externalizers) {
         this.cacheManager = cacheManager;
         this.globalConfiguration = cacheManager.getCacheManagerConfiguration();
         this.defaultConfiguration = cacheManager.getDefaultCacheConfiguration();
@@ -152,6 +156,10 @@ public class KiWiCacheManager {
 
         this.clustered = kiWiConfiguration.isClustered();
 
+        for(AdvancedExternalizer e : externalizers) {
+            this.globalConfiguration.serialization().advancedExternalizers().put(e.getId(), e);
+        }
+
         log.info("initialised cache manager ({})", globalConfiguration.isClustered() ? "cluster name: "+globalConfiguration.transport().clusterName() : "single host");
 
         this.embedded = false;

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b454dd73/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/TripleExternalizer.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/TripleExternalizer.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/TripleExternalizer.java
new file mode 100644
index 0000000..737d331
--- /dev/null
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/TripleExternalizer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.kiwi.caching;
+
+import org.apache.marmotta.kiwi.model.rdf.KiWiResource;
+import org.apache.marmotta.kiwi.model.rdf.KiWiTriple;
+import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.apache.marmotta.kiwi.persistence.KiWiConnection;
+import org.apache.marmotta.kiwi.persistence.KiWiPersistence;
+import org.infinispan.commons.marshall.AdvancedExternalizer;
+import org.infinispan.commons.util.Util;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.Set;
+
+/**
+ * An externalizer for Infinispan allowing to more efficiently transport triples by only serializing the node
+ * IDs instead of the whole nodes.
+ */
+public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> {
+
+    private KiWiPersistence persistence;
+
+    public TripleExternalizer(KiWiPersistence persistence) {
+        this.persistence = persistence;
+    }
+
+    @Override
+    public Set<Class<? extends KiWiTriple>> getTypeClasses() {
+        return Util.<Class<? extends KiWiTriple>>asSet(KiWiTriple.class);
+    }
+
+    @Override
+    public Integer getId() {
+        return 13;
+    }
+
+    @Override
+    public void writeObject(ObjectOutput output, KiWiTriple object) throws IOException {
+        output.writeLong(object.getId());
+        output.writeLong(object.getSubject().getId());
+        output.writeLong(object.getPredicate().getId());
+        output.writeLong(object.getObject().getId());
+        output.writeLong(object.getContext() != null ? object.getContext().getId() : -1L);
+        output.writeLong(object.getCreator() != null ? object.getCreator().getId() : -1L);
+        output.writeBoolean(object.isDeleted());
+        output.writeBoolean(object.isInferred());
+        output.writeBoolean(object.isNewTriple());
+        output.writeObject(object.getCreated());
+        output.writeObject(object.getDeletedAt());
+    }
+
+    @Override
+    public KiWiTriple readObject(ObjectInput input) throws IOException, ClassNotFoundException {
+        try {
+            KiWiConnection con = persistence.getConnection();
+            try {
+                KiWiTriple result = new KiWiTriple();
+                result.setId(input.readLong());
+                result.setSubject((KiWiResource) con.loadNodeById(input.readLong()));
+                result.setPredicate((KiWiUriResource) con.loadNodeById(input.readLong()));
+                result.setObject(con.loadNodeById(input.readLong()));
+
+                long contextId = input.readLong();
+                if(contextId > 0) {
+                    result.setContext((KiWiResource) con.loadNodeById(contextId));
+                }
+                long creatorId = input.readLong();
+                if(creatorId > 0) {
+                    result.setCreator((KiWiResource) con.loadNodeById(creatorId));
+                }
+
+                result.setDeleted(input.readBoolean());
+                result.setInferred(input.readBoolean());
+                result.setNewTriple(input.readBoolean());
+
+                result.setCreated((Date) input.readObject());
+                result.setDeletedAt((Date) input.readObject());
+
+
+                return result;
+            } finally {
+                con.commit();
+                con.close();
+            }
+        } catch (SQLException ex) {
+            throw new IOException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b454dd73/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 2022c46..7bcd06d 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -18,6 +18,8 @@
 package org.apache.marmotta.kiwi.persistence;
 
 import com.google.common.base.Preconditions;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import info.aduna.iteration.*;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.marmotta.commons.sesame.model.LiteralCommons;
@@ -51,7 +53,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * <p/>
  * Author: Sebastian Schaffert
  */
-public class KiWiConnection {
+public class KiWiConnection implements AutoCloseable {
 
     private static Logger log = LoggerFactory.getLogger(KiWiConnection.class);
 
@@ -80,13 +82,13 @@ public class KiWiConnection {
     /**
      * Cache URI resources by uri
      */
-    private Cache<String,KiWiUriResource> uriCache;
+    private Cache<Long,KiWiUriResource> uriCache;
 
 
     /**
      * Cache BNodes by BNode ID
      */
-    private Cache<String,KiWiAnonResource> bnodeCache;
+    private Cache<Long,KiWiAnonResource> bnodeCache;
 
     /**
      * Cache literals by literal cache key (LiteralCommons#createCacheKey(String,Locale,URI))
@@ -492,7 +494,7 @@ public class KiWiConnection {
         Preconditions.checkNotNull(uri);
 
         // look in cache
-        KiWiUriResource element = uriCache.get(uri);
+        KiWiUriResource element = uriCache.get(createCacheKey(uri));
         if(element != null) {
             return element;
         }
@@ -538,7 +540,7 @@ public class KiWiConnection {
      */
     public KiWiAnonResource loadAnonResource(String id) throws SQLException {
         // look in cache
-        KiWiAnonResource element = bnodeCache.get(id);
+        KiWiAnonResource element = bnodeCache.get(createCacheKey(id));
         if(element != null) {
             return element;
         }
@@ -1773,9 +1775,9 @@ public class KiWiConnection {
             nodeCache.putForExternalRead(node.getId(), node);
         }
         if(node instanceof KiWiUriResource) {
-            uriCache.putForExternalRead(node.stringValue(), (KiWiUriResource) node);
+            uriCache.putForExternalRead(createCacheKey(node.stringValue()), (KiWiUriResource) node);
         } else if(node instanceof KiWiAnonResource) {
-            bnodeCache.putForExternalRead(node.stringValue(), (KiWiAnonResource) node);
+            bnodeCache.putForExternalRead(createCacheKey(node.stringValue()), (KiWiAnonResource) node);
         } else if(node instanceof KiWiLiteral) {
             literalCache.putForExternalRead(LiteralCommons.createCacheKey((Literal) node), (KiWiLiteral) node);
         }
@@ -1783,7 +1785,7 @@ public class KiWiConnection {
 
     private void cacheTriple(KiWiTriple triple) {
         if(triple.getId() >= 0) {
-            tripleCache.putForExternalRead(triple.getId(),triple);
+            tripleCache.putForExternalRead(triple.getId(), triple);
         }
     }
 
@@ -2307,4 +2309,12 @@ public class KiWiConnection {
         }
 
     }
+
+    private static Long createCacheKey(String svalue) {
+        Hasher hasher = Hashing.goodFastHash(64).newHasher();
+        hasher.putString(svalue);
+        return hasher.hash().asLong();
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b454dd73/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
index 6ca027b..d55c5bd 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
@@ -18,6 +18,7 @@
 package org.apache.marmotta.kiwi.persistence;
 
 import org.apache.marmotta.kiwi.caching.KiWiCacheManager;
+import org.apache.marmotta.kiwi.caching.TripleExternalizer;
 import org.apache.marmotta.kiwi.config.KiWiConfiguration;
 import org.apache.marmotta.kiwi.generator.IDGenerator;
 import org.apache.marmotta.kiwi.generator.SnowflakeIDGenerator;
@@ -138,9 +139,9 @@ public class KiWiPersistence {
 
     private void initCachePool() {
         if(infinispan != null) {
-            cacheManager = new KiWiCacheManager(infinispan,configuration);
+            cacheManager = new KiWiCacheManager(infinispan,configuration, new TripleExternalizer(this));
         } else {
-            cacheManager = new KiWiCacheManager(configuration);
+            cacheManager = new KiWiCacheManager(configuration, new TripleExternalizer(this));
         }
     }
 
@@ -491,4 +492,6 @@ public class KiWiPersistence {
     public IDGenerator getIdGenerator() {
         return idGenerator;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/b454dd73/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/registry/CacheTripleRegistry.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/registry/CacheTripleRegistry.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/registry/CacheTripleRegistry.java
index e35744e..2c68d68 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/registry/CacheTripleRegistry.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/registry/CacheTripleRegistry.java
@@ -65,12 +65,8 @@ public class CacheTripleRegistry implements KiWiTripleRegistry {
             transaction = new ArrayList<>();
             transactions.put(transactionId, transaction);
         }
-        Long old = cache.put(key.longHashCode(),tripleId);
+        cache.putForExternalRead(key.longHashCode(),tripleId);
         transaction.add(key.longHashCode());
-
-        if(old != null && old != tripleId) {
-            log.warn("registered a new triple ID for an already existing triple");
-        }
     }
 
     /**
@@ -112,6 +108,6 @@ public class CacheTripleRegistry implements KiWiTripleRegistry {
      */
     @Override
     public void deleteKey(IntArray key) {
-        cache.remove(key.longHashCode());
+        cache.removeAsync(key.longHashCode());
     }
 }