You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2014/03/05 13:24:20 UTC

[05/21] git commit: work on externalizer

work on externalizer


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/1698c6cb
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/1698c6cb
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/1698c6cb

Branch: refs/heads/MARMOTTA-450
Commit: 1698c6cb722eb5dc0db2a382282458fe7b12b321
Parents: 2e324d8
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Mon Mar 3 17:27:19 2014 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Mon Mar 3 17:27:19 2014 +0100

----------------------------------------------------------------------
 .../InfinispanEmbeddedCacheManager.java         | 310 ++++++++++---------
 .../kiwi/externalizer/TripleExternalizer.java   | 120 +++----
 .../remote/InfinispanRemoteCacheManager.java    | 114 +++++++
 .../apache/marmotta/kiwi/test/ClusterTest.java  | 160 ++++++++++
 .../test/externalizer/ExternalizerTest.java     |  27 +-
 .../apache/marmotta/kiwi/test/ClusterTest.java  | 157 ----------
 6 files changed, 521 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
index 35c2018..5818e0b 100644
--- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
@@ -21,7 +21,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.marmotta.kiwi.caching.CacheManager;
 import org.apache.marmotta.kiwi.config.KiWiConfiguration;
 import org.apache.marmotta.kiwi.externalizer.*;
-import org.apache.marmotta.kiwi.persistence.KiWiPersistence;
 import org.infinispan.Cache;
 import org.infinispan.commons.CacheException;
 import org.infinispan.commons.marshall.AdvancedExternalizer;
@@ -44,9 +43,6 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.marmotta.kiwi.config.CacheMode.DISTRIBUTED;
-import static org.apache.marmotta.kiwi.config.CacheMode.REPLICATED;
-
 /**
  * A class for managing the different caches that are used by the triple store.
  * <p/>
@@ -68,19 +64,12 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
 
     private EmbeddedCacheManager cacheManager;
 
-    private GlobalConfiguration globalConfiguration;
-
     // default configuration: distributed cache, 100000 entries, 300 seconds expiration, 60 seconds idle
     private Configuration defaultConfiguration;
 
-    private boolean clustered, embedded;
-
-    private KiWiConfiguration kiWiConfiguration;
-
-    private KiWiPersistence persistence;
+    private KiWiConfiguration config;
 
-
-    private Cache nodeCache, tripleCache, uriCache, literalCache, bnodeCache, nsPrefixCache, nsUriCache, loaderCache, registryCache;
+    private Cache nodeCache, tripleCache, uriCache, literalCache, bnodeCache, nsPrefixCache, nsUriCache, registryCache;
 
 
     /**
@@ -90,120 +79,169 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
      */
     public InfinispanEmbeddedCacheManager(KiWiConfiguration config) {
 
-        this.clustered = config.isClustered();
-        this.kiWiConfiguration = config;
-
-        if(clustered && (config.getCacheMode() == DISTRIBUTED || config.getCacheMode() == REPLICATED)) {
-            try {
-                String jgroupsXml = IOUtils.toString(InfinispanEmbeddedCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml"));
-
-                jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress()));
-                jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort()));
-
-
-                globalConfiguration = new GlobalConfigurationBuilder()
-                        .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
-                        .transport()
-                            .defaultTransport()
-                            .clusterName(config.getClusterName())
-                            .machineId("instance-" + config.getDatacenterId())
-                            .addProperty("configurationXml", jgroupsXml)
-                        .globalJmxStatistics()
-                            .jmxDomain("org.apache.marmotta.kiwi")
-                            .allowDuplicateDomains(true)
-                        .serialization()
-                            .addAdvancedExternalizer(getExternalizers())
-                        .build();
-            } catch (IOException ex) {
-                log.warn("error loading JGroups configuration from archive: {}", ex.getMessage());
-                log.warn("some configuration options will not be available");
-
-                globalConfiguration = new GlobalConfigurationBuilder()
-                        .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
-                            .transport()
-                            .defaultTransport()
-                            .clusterName(config.getClusterName())
-                            .machineId("instance-" + config.getDatacenterId())
-                            .addProperty("configurationFile", "jgroups-kiwi.xml")
-                        .globalJmxStatistics()
-                            .jmxDomain("org.apache.marmotta.kiwi")
-                            .allowDuplicateDomains(true)
-                        .serialization()
-                            .addAdvancedExternalizer(getExternalizers())
-                        .build();
 
-            }
+        this.config = config;
 
-            if(config.getCacheMode() == DISTRIBUTED) {
-                defaultConfiguration = new ConfigurationBuilder()
-                        .clustering()
-                            .cacheMode(CacheMode.DIST_ASYNC)
-                            .async()
-                                .asyncMarshalling()
-                            .l1()
-                                .lifespan(5, TimeUnit.MINUTES)
-                            .hash()
-                                .numOwners(2)
-                                .numSegments(40)
-                                .consistentHashFactory(new SyncConsistentHashFactory())
-                            .stateTransfer()
-                                .fetchInMemoryState(false)
-                        .eviction()
-                            .strategy(EvictionStrategy.LIRS)
-                            .maxEntries(100000)
-                        .expiration()
-                            .lifespan(30, TimeUnit.MINUTES)
-                            .maxIdle(10, TimeUnit.MINUTES)
-                        .build();
-            } else {
-                defaultConfiguration = new ConfigurationBuilder()
-                        .clustering()
-                            .cacheMode(CacheMode.REPL_ASYNC)
-                            .async()
-                                .asyncMarshalling()
-                        .stateTransfer()
-                            .fetchInMemoryState(false)
-                        .eviction()
-                            .strategy(EvictionStrategy.LIRS)
-                            .maxEntries(100000)
-                        .expiration()
-                            .lifespan(30, TimeUnit.MINUTES)
-                            .maxIdle(10, TimeUnit.MINUTES)
-                        .build();
+        try {
+            switch (config.getCacheMode()) {
+                case DISTRIBUTED:
+                    buildDistributedConfiguration(getExternalizers());
+                    break;
+                case REPLICATED:
+                    buildReplicatedConfiguration(getExternalizers());
+                    break;
+                case LOCAL:
+                    buildLocalConfiguration();
+                    break;
             }
-        } else {
-            globalConfiguration = new GlobalConfigurationBuilder()
-                    .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
-                    .globalJmxStatistics()
-                        .jmxDomain("org.apache.marmotta.kiwi")
-                        .allowDuplicateDomains(true)
-                    .build();
+        } catch (IOException ex) {
+            log.warn("error while building cache cluster configuration, reverting to local cache");
+            buildLocalConfiguration();
+        }
 
-            defaultConfiguration = new ConfigurationBuilder()
-                    .clustering()
-                        .cacheMode(CacheMode.LOCAL)
-                    .eviction()
-                        .strategy(EvictionStrategy.LIRS)
-                        .maxEntries(100000)
-                    .expiration()
+
+    }
+
+    /**
+     * Build a local cache configuration.
+     * <p/>
+     * In local cache mode, the cache is not shared among the servers in a cluster. Each machine keeps a local cache.
+     * This allows quick startups and eliminates network traffic in the cluster, but subsequent requests to different
+     * cluster members cannot benefit from the cached data.
+     */
+    protected void buildLocalConfiguration() {
+        GlobalConfiguration globalConfiguration = new GlobalConfigurationBuilder()
+                .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
+                .globalJmxStatistics()
+                    .jmxDomain("org.apache.marmotta.kiwi")
+                    .allowDuplicateDomains(true)
+                .build();
+
+        defaultConfiguration = new ConfigurationBuilder()
+                .clustering()
+                    .cacheMode(CacheMode.LOCAL)
+                .eviction()
+                    .strategy(EvictionStrategy.LIRS)
+                    .maxEntries(100000)
+                .expiration()
+                    .lifespan(5, TimeUnit.MINUTES)
+                    .maxIdle(1, TimeUnit.MINUTES)
+                .build();
+        cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true);
+
+        log.info("initialised local cache manager");
+    }
+
+    /**
+     * Build a distributed cache configuration.
+     * <p/>
+     * In distributed cache mode, the cluster forms a big hash table used as a cache. This allows to make efficient
+     * use of the large amount of memory available, but requires cache rebalancing and a lot of network transfers,
+     * especially in case cluster members are restarted often.
+     */
+    protected void buildDistributedConfiguration(AdvancedExternalizer...externalizers) throws IOException {
+        String jgroupsXml = IOUtils.toString(InfinispanEmbeddedCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml"));
+
+        jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress()));
+        jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort()));
+
+
+        GlobalConfiguration globalConfiguration = new GlobalConfigurationBuilder()
+                .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
+                .transport()
+                    .defaultTransport()
+                    .clusterName(config.getClusterName())
+                    .machineId("instance-" + config.getDatacenterId())
+                    .addProperty("configurationXml", jgroupsXml)
+                .globalJmxStatistics()
+                    .jmxDomain("org.apache.marmotta.kiwi")
+                    .allowDuplicateDomains(true)
+                .serialization()
+                    .addAdvancedExternalizer(externalizers)
+                .build();
+
+        defaultConfiguration = new ConfigurationBuilder()
+                .clustering()
+                    .cacheMode(CacheMode.DIST_ASYNC)
+                    .async()
+                        .asyncMarshalling()
+                    .l1()
                         .lifespan(5, TimeUnit.MINUTES)
-                        .maxIdle(1, TimeUnit.MINUTES)
-                    .build();
+                    .hash()
+                        .numOwners(2)
+                        .numSegments(40)
+                        .consistentHashFactory(new SyncConsistentHashFactory())
+                    .stateTransfer()
+                        .fetchInMemoryState(false)
+                .eviction()
+                    .strategy(EvictionStrategy.LIRS)
+                    .maxEntries(100000)
+                .expiration()
+                    .lifespan(30, TimeUnit.MINUTES)
+                    .maxIdle(10, TimeUnit.MINUTES)
+                .build();
+        cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true);
 
-        }
+        log.info("initialised distributed cache manager (cluster name: {})",  globalConfiguration.transport().clusterName());
 
+    }
 
+    /**
+     * Build a replicated cache configuration.
+     * <p/>
+     * In replicated cache mode, each node in the cluster has an identical copy of all cache data. This allows
+     * very efficient cache lookups and reduces the rebalancing effort, but requires more memory.
+     */
+    protected void buildReplicatedConfiguration(AdvancedExternalizer...externalizers) throws IOException {
+        String jgroupsXml = IOUtils.toString(InfinispanEmbeddedCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml"));
+
+        jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress()));
+        jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort()));
+
+
+        GlobalConfiguration globalConfiguration = new GlobalConfigurationBuilder()
+                .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
+                .transport()
+                    .defaultTransport()
+                    .clusterName(config.getClusterName())
+                    .machineId("instance-" + config.getDatacenterId())
+                    .addProperty("configurationXml", jgroupsXml)
+                .globalJmxStatistics()
+                    .jmxDomain("org.apache.marmotta.kiwi")
+                    .allowDuplicateDomains(true)
+                .serialization()
+                    .addAdvancedExternalizer(externalizers)
+                .build();
+
+        defaultConfiguration = new ConfigurationBuilder()
+                .clustering()
+                    .cacheMode(CacheMode.REPL_ASYNC)
+                    .async()
+                        .asyncMarshalling()
+                    .stateTransfer()
+                        .fetchInMemoryState(false)
+                .eviction()
+                    .strategy(EvictionStrategy.LIRS)
+                    .maxEntries(100000)
+                .expiration()
+                    .lifespan(30, TimeUnit.MINUTES)
+                    .maxIdle(10, TimeUnit.MINUTES)
+                .build();
         cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true);
 
-        log.info("initialised Infinispan cache manager ({})", globalConfiguration.isClustered() ? "cluster name: "+globalConfiguration.transport().clusterName() : "single host");
+        log.info("initialised replicated cache manager (cluster name: {})",  globalConfiguration.transport().clusterName());
+    }
+
 
-        this.embedded = true;
+    protected boolean isClustered() {
+        return config.getCacheMode() == org.apache.marmotta.kiwi.config.CacheMode.DISTRIBUTED ||
+               config.getCacheMode() == org.apache.marmotta.kiwi.config.CacheMode.REPLICATED;
     }
 
 
     private AdvancedExternalizer[] getExternalizers() {
         return new AdvancedExternalizer[] {
-                new TripleExternalizer(persistence),
+                new TripleExternalizer(),
                 new UriExternalizer(),
                 new BNodeExternalizer(),
                 new StringLiteralExternalizer(),
@@ -249,7 +287,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
                     .clustering()
                         .cacheMode(CacheMode.LOCAL)
                     .eviction()
-                        .maxEntries(kiWiConfiguration.getTripleCacheSize())
+                        .maxEntries(config.getTripleCacheSize())
                     .expiration()
                         .lifespan(60, TimeUnit.MINUTES)
                         .maxIdle(30, TimeUnit.MINUTES)
@@ -272,7 +310,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
         if(uriCache == null) {
             Configuration uriConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
                     .eviction()
-                        .maxEntries(kiWiConfiguration.getUriCacheSize())
+                        .maxEntries(config.getUriCacheSize())
                     .build();
             cacheManager.defineConfiguration(URI_CACHE, uriConfiguration);
 
@@ -292,7 +330,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
         if(bnodeCache == null) {
             Configuration bnodeConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
                     .eviction()
-                        .maxEntries(kiWiConfiguration.getBNodeCacheSize())
+                        .maxEntries(config.getBNodeCacheSize())
                     .build();
             cacheManager.defineConfiguration(BNODE_CACHE, bnodeConfiguration);
 
@@ -312,7 +350,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
         if(literalCache == null) {
             Configuration literalConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
                     .eviction()
-                        .maxEntries(kiWiConfiguration.getLiteralCacheSize())
+                        .maxEntries(config.getLiteralCacheSize())
                     .build();
             cacheManager.defineConfiguration(LITERAL_CACHE, literalConfiguration);
 
@@ -328,12 +366,12 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
      */
     public Cache getNamespaceUriCache() {
         if(nsUriCache == null) {
-            if(clustered) {
+            if(isClustered()) {
                 Configuration nsuriConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
                         .clustering()
                             .cacheMode(CacheMode.REPL_ASYNC)
                         .eviction()
-                            .maxEntries(kiWiConfiguration.getNamespaceCacheSize())
+                            .maxEntries(config.getNamespaceCacheSize())
                         .expiration()
                             .lifespan(1, TimeUnit.DAYS)
                         .build();
@@ -341,7 +379,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
             } else {
                 Configuration nsuriConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
                         .eviction()
-                            .maxEntries(kiWiConfiguration.getNamespaceCacheSize())
+                            .maxEntries(config.getNamespaceCacheSize())
                         .expiration()
                             .lifespan(1, TimeUnit.HOURS)
                         .build();
@@ -359,12 +397,12 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
      */
     public Cache getNamespacePrefixCache() {
         if(nsPrefixCache == null) {
-            if(clustered) {
+            if(isClustered()) {
                 Configuration nsprefixConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
                         .clustering()
                             .cacheMode(CacheMode.REPL_ASYNC)
                         .eviction()
-                            .maxEntries(kiWiConfiguration.getNamespaceCacheSize())
+                            .maxEntries(config.getNamespaceCacheSize())
                         .expiration()
                             .lifespan(1, TimeUnit.DAYS)
                         .build();
@@ -373,7 +411,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
             } else {
                 Configuration nsprefixConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
                         .eviction()
-                            .maxEntries(kiWiConfiguration.getNamespaceCacheSize())
+                            .maxEntries(config.getNamespaceCacheSize())
                         .expiration()
                             .lifespan(1, TimeUnit.HOURS)
                         .build();
@@ -395,7 +433,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
      */
     public Cache getRegistryCache() {
         if(registryCache == null) {
-            if(clustered) {
+            if(isClustered()) {
                 Configuration registryConfiguration = new ConfigurationBuilder()
                     .clustering()
                         .cacheMode(CacheMode.REPL_SYNC)
@@ -436,31 +474,6 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
     }
 
     /**
-     * Return the Infinispan cache manager used by the caching infrastructure.
-     *
-     * @return
-     */
-    public EmbeddedCacheManager getCacheManager() {
-        return cacheManager;
-    }
-
-    /**
-     * Return the global cache manager configuration used by the caching infrastructure.
-     * @return
-     */
-    public GlobalConfiguration getGlobalConfiguration() {
-        return globalConfiguration;
-    }
-
-    /**
-     * Return the default cache configuration used by the caching infrastructure.
-     * @return
-     */
-    public Configuration getDefaultConfiguration() {
-        return defaultConfiguration;
-    }
-
-    /**
      * Clear all caches managed by this cache manager.
      */
     public void clear() {
@@ -479,7 +492,6 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
         bnodeCache    = null;
         nsPrefixCache = null;
         nsUriCache    = null;
-        loaderCache   = null;
         registryCache = null;
     }
 
@@ -488,7 +500,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
      */
     public void shutdown() {
         try {
-            if(embedded && cacheManager.getStatus() == ComponentStatus.RUNNING) {
+            if(cacheManager.getStatus() == ComponentStatus.RUNNING) {
                 log.warn("shutting down cache manager ...");
 //                if(cacheManager.getTransport() != null) {
 //                    log.info("... shutting down transport ...");

http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java
index e4c584c..05e6933 100644
--- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java
@@ -17,21 +17,17 @@
 
 package org.apache.marmotta.kiwi.externalizer;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
 import org.apache.marmotta.kiwi.model.rdf.KiWiResource;
 import org.apache.marmotta.kiwi.model.rdf.KiWiTriple;
 import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
-import org.apache.marmotta.kiwi.persistence.KiWiConnection;
-import org.apache.marmotta.kiwi.persistence.KiWiPersistence;
 import org.infinispan.commons.marshall.AdvancedExternalizer;
 import org.infinispan.commons.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.sql.SQLException;
 import java.util.Date;
 import java.util.Set;
 
@@ -41,13 +37,10 @@ import java.util.Set;
  */
 public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> {
 
-    private static Logger log = LoggerFactory.getLogger(TripleExternalizer.class);
 
-    private KiWiPersistence persistence;
+    public static final int MODE_DEFAULT = 1;
+    public static final int MODE_PREFIX  = 2;
 
-    public TripleExternalizer(KiWiPersistence persistence) {
-        this.persistence = persistence;
-    }
 
     @Override
     public Set<Class<? extends KiWiTriple>> getTypeClasses() {
@@ -62,11 +55,39 @@ public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> {
     @Override
     public void writeObject(ObjectOutput output, KiWiTriple object) throws IOException {
         output.writeLong(object.getId());
-        output.writeLong(object.getSubject().getId());
-        output.writeLong(object.getPredicate().getId());
-        output.writeLong(object.getObject().getId());
-        output.writeLong(object.getContext() != null ? object.getContext().getId() : -1L);
-        output.writeLong(object.getCreator() != null ? object.getCreator().getId() : -1L);
+
+        // in case subject and object are both uris we use a special prefix-compressed mode
+        if(object.getSubject().isUriResource() && object.getObject().isUriResource()) {
+            String sUri = object.getSubject().stringValue();
+            String oUri = object.getObject().stringValue();
+
+            String prefix = StringUtils.getCommonPrefix(sUri,oUri);
+
+            output.writeByte(MODE_PREFIX);
+            output.writeInt(prefix.length());
+            output.writeChars(prefix);
+
+            output.writeLong(object.getSubject().getId());
+            output.writeInt(sUri.length() - prefix.length());
+            output.writeChars(sUri.substring(prefix.length()));
+            output.writeLong(object.getSubject().getCreated().getTime());
+
+            output.writeObject(object.getPredicate());
+
+            output.writeLong(object.getObject().getId());
+            output.writeInt(oUri.length() - prefix.length());
+            output.writeChars(oUri.substring(prefix.length()));
+            output.writeLong(object.getObject().getCreated().getTime());
+        } else {
+            output.writeByte(MODE_DEFAULT);
+
+            output.writeObject(object.getSubject());
+            output.writeObject(object.getPredicate());
+            output.writeObject(object.getObject());
+        }
+
+        output.writeObject(object.getContext());
+        output.writeObject(object.getCreator());
         output.writeBoolean(object.isDeleted());
         output.writeBoolean(object.isInferred());
         output.writeBoolean(object.isNewTriple());
@@ -80,48 +101,33 @@ public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> {
 
     @Override
     public KiWiTriple readObject(ObjectInput input) throws IOException, ClassNotFoundException {
-        try {
-            KiWiConnection con = persistence.getConnection();
-            try {
-                KiWiTriple result = new KiWiTriple();
-                result.setId(input.readLong());
-
-                long[] nodeIds = new long[5];
-                for(int i=0; i<5; i++) {
-                    nodeIds[0] = input.readLong();
-                }
-                KiWiNode[] nodes = con.loadNodesByIds(nodeIds);
-
-                result.setSubject((KiWiResource) nodes[0]);
-                result.setPredicate((KiWiUriResource) nodes[1]);
-                result.setObject(nodes[2]);
-
-                if(nodes[3] != null) {
-                    result.setContext((KiWiResource) nodes[3]);
-                }
-                if(nodes[4] != null) {
-                    result.setCreator((KiWiResource) nodes[4]);
-                }
-
-                result.setDeleted(input.readBoolean());
-                result.setInferred(input.readBoolean());
-                result.setNewTriple(input.readBoolean());
-
-                result.setCreated(new Date(input.readLong()));
-
-                long deletedAt = input.readLong();
-                if(deletedAt > 0) {
-                    result.setDeletedAt(new Date(deletedAt));
-                }
-
-
-                return result;
-            } finally {
-                con.commit();
-                con.close();
-            }
-        } catch (SQLException ex) {
-            throw new IOException(ex);
+
+        KiWiTriple result = new KiWiTriple();
+        result.setId(input.readLong());
+
+        int mode = input.readInt();
+        if(mode == MODE_PREFIX) {
+            String prefix =
+        }
+
+
+        result.setSubject((KiWiResource) input.readObject());
+        result.setPredicate((KiWiUriResource) input.readObject());
+        result.setObject((KiWiNode) input.readObject());
+        result.setContext((KiWiResource) input.readObject());
+        result.setCreator((KiWiResource) input.readObject());
+        result.setDeleted(input.readBoolean());
+        result.setInferred(input.readBoolean());
+        result.setNewTriple(input.readBoolean());
+
+        result.setCreated(new Date(input.readLong()));
+
+        long deletedAt = input.readLong();
+        if(deletedAt > 0) {
+            result.setDeletedAt(new Date(deletedAt));
         }
+
+
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java
index c40feb8..6f5034a 100644
--- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java
@@ -19,6 +19,9 @@ package org.apache.marmotta.kiwi.remote;
 
 import org.apache.marmotta.kiwi.caching.CacheManager;
 import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.model.rdf.*;
+
+import java.util.Map;
 
 /**
  * Implementation of an Infinispan cache manager with a remote (client-server) cache.
@@ -34,7 +37,118 @@ public class InfinispanRemoteCacheManager implements CacheManager {
     }
 
 
+    /**
+     * Return the node id -> node cache from the cache manager. This cache is heavily used to lookup
+     * nodes when querying or loading triples and should therefore have a decent size (default 500.000 elements).
+     *
+     * @return an EHCache Cache instance containing the node id -> node mappings
+     */
+    @Override
+    public Map<Long, KiWiNode> getNodeCache() {
+        return null;
+    }
+
+    /**
+     * Return the triple id -> triple cache from the cache manager. This cache is used for speeding up the
+     * construction of query results.
+     *
+     * @return
+     */
+    @Override
+    public Map<Long, KiWiTriple> getTripleCache() {
+        return null;
+    }
 
+    /**
+     * Return the uri -> KiWiUriResource cache from the cache manager. This cache is used when constructing new
+     * KiWiUriResources to avoid a database lookup.
+     *
+     * @return
+     */
+    @Override
+    public Map<String, KiWiUriResource> getUriCache() {
+        return null;
+    }
 
+    /**
+     * Return the anonId -> KiWiAnonResource cache from the cache manager. This cache is used when constructing new
+     * KiWiAnonResources to avoid a database lookup.
+     *
+     * @return
+     */
+    @Override
+    public Map<String, KiWiAnonResource> getBNodeCache() {
+        return null;
+    }
 
+    /**
+     * Return the literal cache key -> KiWiLiteral cache from the cache manager. This cache is used when constructing new
+     * KiWiLiterals to avoid a database lookup.
+     *
+     * @return
+     * @see org.apache.marmotta.commons.sesame.model.LiteralCommons#createCacheKey(String, java.util.Locale, String)
+     */
+    @Override
+    public Map<String, KiWiLiteral> getLiteralCache() {
+        return null;
+    }
+
+    /**
+     * Return the URI -> namespace cache from the cache manager. Used for looking up namespaces
+     *
+     * @return
+     */
+    @Override
+    public Map<String, KiWiNamespace> getNamespaceUriCache() {
+        return null;
+    }
+
+    /**
+     * Return the prefix -> namespace cache from the cache manager. Used for looking up namespaces
+     *
+     * @return
+     */
+    @Override
+    public Map<String, KiWiNamespace> getNamespacePrefixCache() {
+        return null;
+    }
+
+    /**
+     * Create and return the cache used by the CacheTripleRegistry. This is an unlimited synchronous replicated
+     * cache and should be used with care.
+     *
+     * @return
+     */
+    @Override
+    public Map<Long, Long> getRegistryCache() {
+        return null;
+    }
+
+    /**
+     * Get the cache with the given name from the cache manager. Can be used to request additional
+     * caches from the cache manager that are not covered by explicit methods.
+     *
+     * @param name
+     * @return
+     */
+    @Override
+    public Map getCacheByName(String name) {
+        return null;
+    }
+
+    /**
+     * Clear all caches managed by this cache manager.
+     */
+    @Override
+    public void clear() {
+
+    }
+
+    /**
+     * Shutdown this cache manager instance. Will shutdown the underlying EHCache cache manager.
+     */
+    @Override
+    public void shutdown() {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
new file mode 100644
index 0000000..0b6823c
--- /dev/null
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.kiwi.test;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.marmotta.kiwi.caching.CacheManager;
+import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.embedded.InfinispanEmbeddedCacheManagerFactory;
+import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.junit.*;
+import org.openrdf.model.URI;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class ClusterTest {
+
+    private static Logger log = LoggerFactory.getLogger(ClusterTest.class);
+
+    KiWiConfiguration config1, config2;
+
+    KiWiStore store1, store2;
+
+    Repository repository1, repository2;
+
+    CacheManager cacheManager1, cacheManager2;
+
+    @Before
+    public void setup() throws RepositoryException {
+        config1 = new KiWiConfiguration(
+                "default-H2",
+                "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
+                "kiwi", "kiwi",
+                new H2Dialect());
+        config1.setDatacenterId(1);
+        config1.setClustered(true);
+        config1.setCacheManagerFactory(InfinispanEmbeddedCacheManagerFactory.class.getName());
+
+        config2 = new KiWiConfiguration(
+                "default-H2",
+                "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
+                "kiwi", "kiwi",
+                new H2Dialect());
+        config2.setDatacenterId(2);
+        config2.setClustered(true);
+        config2.setCacheManagerFactory(InfinispanEmbeddedCacheManagerFactory.class.getName());
+
+
+
+    }
+
+    public void setupCluster(int port1, int port2) throws RepositoryException {
+        config1.setClusterPort(port1);
+        config2.setClusterPort(port2);
+
+        store1 = new KiWiStore(config1);
+        store2 = new KiWiStore(config2);
+
+        repository1 = new SailRepository(store1);
+        repository2 = new SailRepository(store2);
+
+        repository1.initialize();
+        repository2.initialize();
+
+        cacheManager1 = store1.getPersistence().getCacheManager();
+        cacheManager2 = store2.getPersistence().getCacheManager();
+    }
+
+
+    @After
+    public void teardown() throws RepositoryException {
+        repository1.shutDown();
+        repository2.shutDown();
+    }
+
+
+    @Test
+    @Ignore
+    public void testClusteredCacheSync() throws InterruptedException, RepositoryException {
+        setupCluster(61222,61222);
+
+        log.info("testing cache synchronization ...");
+
+        URI u = repository1.getValueFactory().createURI("http://localhost/test1");
+
+
+        // give the cluster some time to performance asynchronous balancing
+        Thread.sleep(100);
+
+
+        log.debug("test if resource is in cache where it was created ...");
+        URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
+
+        Assert.assertNotNull(u1);
+        Assert.assertEquals(u,u1);
+
+        log.debug("test if resource has been synced to other cache in cluster ...");
+        URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
+
+        Assert.assertNotNull(u2);
+        Assert.assertEquals(u,u2);
+    }
+
+    @Test
+    @Ignore
+    public void testDisjointClusters() throws InterruptedException, RepositoryException {
+        setupCluster(61224,61225);
+
+        log.info("testing caches on different ports ...");
+
+        URI u = repository1.getValueFactory().createURI("http://localhost/test1");
+
+
+        // give the cluster some time to performance asynchronous balancing
+        Thread.sleep(100);
+
+        log.debug("test if resource is in cache where it was created ...");
+        URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
+
+        Assert.assertNotNull(u1);
+        Assert.assertEquals(u,u1);
+
+        log.debug("test if resource has been synced to other cache in cluster ...");
+        URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
+
+        Assert.assertNull(u2);
+    }
+
+
+    private static Long createCacheKey(String svalue) {
+        Hasher hasher = Hashing.goodFastHash(64).newHasher();
+        hasher.putString(svalue);
+        return hasher.hash().asLong();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java
index fe7b701..9f52813 100644
--- a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java
@@ -18,10 +18,8 @@
 package org.apache.marmotta.kiwi.test.externalizer;
 
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
-import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.apache.marmotta.kiwi.externalizer.*;
+import org.apache.marmotta.kiwi.model.rdf.*;
 import org.apache.marmotta.kiwi.test.TestValueFactory;
 import org.infinispan.commons.marshall.AdvancedExternalizer;
 import org.infinispan.commons.marshall.StreamingMarshaller;
@@ -123,6 +121,16 @@ public class ExternalizerTest {
     }
 
 
+    @Test
+    public void testTriple() throws Exception {
+        KiWiUriResource s = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+        KiWiUriResource p = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+        KiWiNode o = (KiWiNode) randomNode();
+        KiWiTriple t = (KiWiTriple) valueFactory.createStatement(s,p,o);
+
+        marshall(t, new TripleExternalizer());
+    }
+
     /**
      * Run the given object through the marshaller using an in-memory stream.
      * @param origin
@@ -130,6 +138,17 @@ public class ExternalizerTest {
      * @return
      */
     private <T> void marshall(T origin, AdvancedExternalizer<T> externalizer) throws IOException, ClassNotFoundException, InterruptedException {
+        log.info("- testing Java ObjectStream ...");
+        ByteArrayOutputStream outBytesOS = new ByteArrayOutputStream();
+        ObjectOutputStream outOS = new ObjectOutputStream(outBytesOS);
+
+        outOS.writeObject(origin);
+
+        outOS.close();
+
+        log.info("  object {}: serialized with {} bytes", origin, outBytesOS.size());
+
+
         log.info("- testing externalizer directly ...");
         ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
         ObjectOutputStream out = new ObjectOutputStream(outBytes);

http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
deleted file mode 100644
index 5f4d719..0000000
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.marmotta.kiwi.test;
-
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.marmotta.kiwi.caching.KiWiCacheManager;
-import org.apache.marmotta.kiwi.config.KiWiConfiguration;
-import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
-import org.apache.marmotta.kiwi.sail.KiWiStore;
-import org.junit.*;
-import org.openrdf.model.URI;
-import org.openrdf.repository.Repository;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.repository.sail.SailRepository;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Add file description here!
- *
- * @author Sebastian Schaffert (sschaffert@apache.org)
- */
-public class ClusterTest {
-
-    private static Logger log = LoggerFactory.getLogger(ClusterTest.class);
-
-    KiWiConfiguration config1, config2;
-
-    KiWiStore store1, store2;
-
-    Repository repository1, repository2;
-
-    KiWiCacheManager cacheManager1, cacheManager2;
-
-    @Before
-    public void setup() throws RepositoryException {
-        config1 = new KiWiConfiguration(
-                "default-H2",
-                "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
-                "kiwi", "kiwi",
-                new H2Dialect());
-        config1.setDatacenterId(1);
-        config1.setClustered(true);
-
-        config2 = new KiWiConfiguration(
-                "default-H2",
-                "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
-                "kiwi", "kiwi",
-                new H2Dialect());
-        config2.setDatacenterId(2);
-        config2.setClustered(true);
-
-
-
-    }
-
-    public void setupCluster(int port1, int port2) throws RepositoryException {
-        config1.setClusterPort(port1);
-        config2.setClusterPort(port2);
-
-        store1 = new KiWiStore(config1);
-        store2 = new KiWiStore(config2);
-
-        repository1 = new SailRepository(store1);
-        repository2 = new SailRepository(store2);
-
-        repository1.initialize();
-        repository2.initialize();
-
-        cacheManager1 = store1.getPersistence().getCacheManager();
-        cacheManager2 = store2.getPersistence().getCacheManager();
-    }
-
-
-    @After
-    public void teardown() throws RepositoryException {
-        repository1.shutDown();
-        repository2.shutDown();
-    }
-
-
-    @Test
-    @Ignore
-    public void testClusteredCacheSync() throws InterruptedException, RepositoryException {
-        setupCluster(61222,61222);
-
-        log.info("testing cache synchronization ...");
-
-        URI u = repository1.getValueFactory().createURI("http://localhost/test1");
-
-
-        // give the cluster some time to performance asynchronous balancing
-        Thread.sleep(100);
-
-
-        log.debug("test if resource is in cache where it was created ...");
-        URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
-
-        Assert.assertNotNull(u1);
-        Assert.assertEquals(u,u1);
-
-        log.debug("test if resource has been synced to other cache in cluster ...");
-        URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
-
-        Assert.assertNotNull(u2);
-        Assert.assertEquals(u,u2);
-    }
-
-    @Test
-    @Ignore
-    public void testDisjointClusters() throws InterruptedException, RepositoryException {
-        setupCluster(61224,61225);
-
-        log.info("testing caches on different ports ...");
-
-        URI u = repository1.getValueFactory().createURI("http://localhost/test1");
-
-
-        // give the cluster some time to performance asynchronous balancing
-        Thread.sleep(100);
-
-        log.debug("test if resource is in cache where it was created ...");
-        URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
-
-        Assert.assertNotNull(u1);
-        Assert.assertEquals(u,u1);
-
-        log.debug("test if resource has been synced to other cache in cluster ...");
-        URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
-
-        Assert.assertNull(u2);
-    }
-
-
-    private static Long createCacheKey(String svalue) {
-        Hasher hasher = Hashing.goodFastHash(64).newHasher();
-        hasher.putString(svalue);
-        return hasher.hash().asLong();
-    }
-
-}