You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2014/03/17 11:23:02 UTC

[11/32] git commit: - implementation of Hazelcast cache backend - common base class for cache cluster tests

- implementation of Hazelcast cache backend
- common base class for cache cluster tests


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/62c44dea
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/62c44dea
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/62c44dea

Branch: refs/heads/develop
Commit: 62c44deafc9cf6332087368a8372dad37cb3032f
Parents: 27a0523
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Mon Mar 3 22:30:09 2014 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Mon Mar 3 22:30:09 2014 +0100

----------------------------------------------------------------------
 .../org/apache/marmotta/commons/io/DataIO.java  |  11 +-
 libraries/kiwi/kiwi-caching-hazelcast/pom.xml   |  59 +++++
 .../caching/HazelcastCacheManager.java          | 130 +++++++++-
 .../marmotta/kiwi/hazelcast/util/AsyncMap.java  | 102 ++++++++
 .../kiwi/test/cluster/HazelcastClusterTest.java |  36 +++
 .../kiwi/test/cluster/SerializerTest.java       | 241 +++++++++++++++++++
 .../InfinispanEmbeddedCacheManager.java         |   6 +-
 .../apache/marmotta/kiwi/test/ClusterTest.java  | 160 ------------
 .../kiwi/test/cluster/BaseClusterTest.java      | 160 ++++++++++++
 9 files changed, 727 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java
----------------------------------------------------------------------
diff --git a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java
index d236ef7..ca4b53d 100644
--- a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java
+++ b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/io/DataIO.java
@@ -33,7 +33,9 @@ public class DataIO {
     public static void writeString(DataOutput out, String s) throws IOException {
         if(s != null) {
             out.writeInt(s.length());
-            out.writeChars(s);
+            for(int i=0; i<s.length(); i++) {
+                out.writeChar(s.charAt(i));
+            }
         } else {
             out.writeInt(-1);
         }
@@ -44,11 +46,12 @@ public class DataIO {
         int len = in.readInt();
 
         if(len >= 0) {
-            StringBuilder builder = new StringBuilder();
+            char[] result = new char[len];
+
             for(int i=0; i<len; i++) {
-                builder.append(in.readChar());
+                result[i] = in.readChar();
             }
-            return builder.toString();
+            return new String(result);
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/pom.xml
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-hazelcast/pom.xml b/libraries/kiwi/kiwi-caching-hazelcast/pom.xml
index 833fece..b1f406a 100644
--- a/libraries/kiwi/kiwi-caching-hazelcast/pom.xml
+++ b/libraries/kiwi/kiwi-caching-hazelcast/pom.xml
@@ -48,6 +48,65 @@
             <groupId>com.hazelcast</groupId>
             <artifactId>hazelcast</artifactId>
         </dependency>
+
+
+        <!-- Testing -->
+        <dependency>
+            <artifactId>junit</artifactId>
+            <groupId>junit</groupId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.marmotta</groupId>
+            <artifactId>kiwi-triplestore</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <artifactId>hamcrest-core</artifactId>
+            <groupId>org.hamcrest</groupId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <artifactId>hamcrest-library</artifactId>
+            <groupId>org.hamcrest</groupId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>test</scope>
+            <optional>true</optional> <!-- GPL licensed, no dependency -->
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-rdfxml</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java b/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java
index 446fa4b..cc037c9 100644
--- a/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java
+++ b/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/caching/HazelcastCacheManager.java
@@ -18,11 +18,19 @@
 package org.apache.marmotta.kiwi.hazelcast.caching;
 
 import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.MaxSizeConfig;
 import com.hazelcast.config.SerializerConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
 import org.apache.marmotta.kiwi.caching.CacheManager;
+import org.apache.marmotta.kiwi.config.CacheMode;
 import org.apache.marmotta.kiwi.config.KiWiConfiguration;
 import org.apache.marmotta.kiwi.hazelcast.serializer.*;
+import org.apache.marmotta.kiwi.hazelcast.util.AsyncMap;
 import org.apache.marmotta.kiwi.model.rdf.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
@@ -33,16 +41,56 @@ import java.util.Map;
  */
 public class HazelcastCacheManager implements CacheManager {
 
+    public static final String TRIPLE_CACHE = "triple-cache";
+    public static final String URI_CACHE = "uri-cache";
+    public static final String BNODE_CACHE = "bnode-cache";
+    public static final String LITERAL_CACHE = "literal-cache";
+    public static final String NODE_CACHE = "node-cache";
+    public static final String NS_PREFIX_CACHE = "ns-prefix-cache";
+    public static final String NS_URI_CACHE = "ns-uri-cache";
+    public static final String REGISTRY_CACHE = "registry-cache";
+
+    private static Logger log = LoggerFactory.getLogger(HazelcastCacheManager.class);
+
     private KiWiConfiguration configuration;
 
     private Config hcConfiguration;
 
+    private HazelcastInstance hazelcast;
+
+    private AsyncMap<Long,KiWiNode> nodeCache;
+    private AsyncMap<Long,KiWiTriple> tripleCache;
+    private AsyncMap<String,KiWiUriResource> uriCache;
+    private AsyncMap<String,KiWiAnonResource> bnodeCache;
+    private AsyncMap<String,KiWiLiteral> literalCache;
+    private AsyncMap<String,KiWiNamespace> nsPrefixCache;
+    private AsyncMap<String,KiWiNamespace> nsUriCache;
+
+    private Map<Long,Long> registryCache;
+
     public HazelcastCacheManager(KiWiConfiguration configuration) {
         this.configuration = configuration;
 
         hcConfiguration = new Config();
+        hcConfiguration.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(true);
+        hcConfiguration.getNetworkConfig().getJoin().getMulticastConfig().setMulticastPort(configuration.getClusterPort());
+        hcConfiguration.getNetworkConfig().getJoin().getMulticastConfig().setMulticastGroup(configuration.getClusterAddress());
+        hcConfiguration.getGroupConfig().setName(configuration.getClusterName());
+
+
+
 
         setupSerializers();
+        setupCaches();
+
+        hazelcast = Hazelcast.newHazelcastInstance(hcConfiguration);
+
+
+        log.info("initialised Hazelcast distributed cache manager (cluster name: {})",  configuration.getClusterName());
+
+        if(configuration.getCacheMode() != CacheMode.DISTRIBUTED) {
+            log.warn("Hazelcast only supports distributed cache mode (mode configuration was {})", configuration.getCacheMode());
+        }
     }
 
     private void setupSerializers() {
@@ -71,6 +119,30 @@ public class HazelcastCacheManager implements CacheManager {
         hcConfiguration.getSerializationConfig().addSerializerConfig(scUri);
     }
 
+    private void setupCaches() {
+        setupMapConfig(NODE_CACHE, configuration.getNodeCacheSize());
+        setupMapConfig(TRIPLE_CACHE, configuration.getTripleCacheSize());
+        setupMapConfig(URI_CACHE, configuration.getUriCacheSize());
+        setupMapConfig(BNODE_CACHE, configuration.getBNodeCacheSize());
+        setupMapConfig(LITERAL_CACHE, configuration.getLiteralCacheSize());
+        setupMapConfig(NS_PREFIX_CACHE, configuration.getNamespaceCacheSize());
+        setupMapConfig(NS_URI_CACHE, configuration.getNamespaceCacheSize());
+
+    }
+
+
+    private void setupMapConfig(String name, int size) {
+        MapConfig cfg = new MapConfig(NODE_CACHE);
+        cfg.setMaxSizeConfig(new MaxSizeConfig(size, MaxSizeConfig.MaxSizePolicy.PER_PARTITION));
+        cfg.setAsyncBackupCount(1);
+        cfg.setBackupCount(0);
+        cfg.setEvictionPolicy(MapConfig.EvictionPolicy.LRU);
+        cfg.setMaxIdleSeconds(600);     // 10 minutes
+        cfg.setTimeToLiveSeconds(3600); // 1 hour
+
+        hcConfiguration.addMapConfig(cfg);
+    }
+
     /**
      * Return the node id -> node cache from the cache manager. This cache is heavily used to lookup
      * nodes when querying or loading triples and should therefore have a decent size (default 500.000 elements).
@@ -79,7 +151,11 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public Map<Long, KiWiNode> getNodeCache() {
-        return null;
+        if(nodeCache == null) {
+            nodeCache = new AsyncMap<>(hazelcast.<Long,KiWiNode>getMap(NODE_CACHE));
+        }
+
+        return nodeCache;
     }
 
     /**
@@ -90,7 +166,11 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public Map<Long, KiWiTriple> getTripleCache() {
-        return null;
+        if(tripleCache == null) {
+            tripleCache = new AsyncMap<>(hazelcast.<Long,KiWiTriple>getMap(TRIPLE_CACHE));
+        }
+
+        return tripleCache;
     }
 
     /**
@@ -101,7 +181,11 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public Map<String, KiWiUriResource> getUriCache() {
-        return null;
+        if(uriCache == null) {
+            uriCache = new AsyncMap<>(hazelcast.<String,KiWiUriResource>getMap(URI_CACHE));
+        }
+
+        return uriCache;
     }
 
     /**
@@ -112,7 +196,11 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public Map<String, KiWiAnonResource> getBNodeCache() {
-        return null;
+        if(bnodeCache == null) {
+            bnodeCache = new AsyncMap<>(hazelcast.<String,KiWiAnonResource>getMap(BNODE_CACHE));
+        }
+
+        return bnodeCache;
     }
 
     /**
@@ -124,7 +212,11 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public Map<String, KiWiLiteral> getLiteralCache() {
-        return null;
+        if(literalCache == null) {
+            literalCache = new AsyncMap<>(hazelcast.<String,KiWiLiteral>getMap(LITERAL_CACHE));
+        }
+
+        return literalCache;
     }
 
     /**
@@ -134,7 +226,11 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public Map<String, KiWiNamespace> getNamespaceUriCache() {
-        return null;
+        if(nsUriCache == null) {
+            nsUriCache = new AsyncMap<>(hazelcast.<String,KiWiNamespace>getMap(NS_URI_CACHE));
+        }
+
+        return nsUriCache;
     }
 
     /**
@@ -144,7 +240,11 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public Map<String, KiWiNamespace> getNamespacePrefixCache() {
-        return null;
+        if(nsPrefixCache == null) {
+            nsPrefixCache = new AsyncMap<>(hazelcast.<String,KiWiNamespace>getMap(NS_PREFIX_CACHE));
+        }
+
+        return nsPrefixCache;
     }
 
     /**
@@ -155,7 +255,11 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public Map<Long, Long> getRegistryCache() {
-        return null;
+        if(registryCache == null) {
+            registryCache = hazelcast.<Long, Long>getMap(REGISTRY_CACHE);
+        }
+
+        return registryCache;
     }
 
     /**
@@ -167,7 +271,7 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public Map getCacheByName(String name) {
-        return null;
+        return hazelcast.getMap(name);
     }
 
     /**
@@ -175,7 +279,11 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public void clear() {
-
+        for(Map m : new Map[] { nodeCache, tripleCache, uriCache, bnodeCache, literalCache, nsPrefixCache, nsUriCache, registryCache}) {
+            if(m != null) {
+                m.clear();
+            }
+        }
     }
 
     /**
@@ -183,6 +291,6 @@ public class HazelcastCacheManager implements CacheManager {
      */
     @Override
     public void shutdown() {
-
+        hazelcast.shutdown();
     }
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/util/AsyncMap.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/util/AsyncMap.java b/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/util/AsyncMap.java
new file mode 100644
index 0000000..7aa4722
--- /dev/null
+++ b/libraries/kiwi/kiwi-caching-hazelcast/src/main/java/org/apache/marmotta/kiwi/hazelcast/util/AsyncMap.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.kiwi.hazelcast.util;
+
+import com.hazelcast.core.IMap;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A Map wrapper mapping write methods to their asynchronous equivalents in Hazelcast.
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class AsyncMap<K,V> implements Map<K,V> {
+
+    private IMap<K,V> delegate;
+
+    public AsyncMap(IMap<K, V> delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public int size() {
+        return delegate.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return delegate.isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(Object o) {
+        return delegate.containsKey(o);
+    }
+
+    @Override
+    public boolean containsValue(Object o) {
+        return delegate.containsValue(o);
+    }
+
+    @Override
+    public V get(Object o) {
+        return delegate.get(o);
+    }
+
+    @Override
+    public V put(K k, V v) {
+        delegate.putAsync(k,v);
+        return null;
+    }
+
+    @Override
+    public V remove(Object o) {
+        delegate.removeAsync((K)o);
+        return null;
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> map) {
+        for(Entry<? extends K, ? extends V> entry : map.entrySet()) {
+            delegate.putAsync(entry.getKey(),entry.getValue());
+        }
+    }
+
+    @Override
+    public void clear() {
+        delegate.clear();
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return delegate.keySet();
+    }
+
+    @Override
+    public Collection<V> values() {
+        return delegate.values();
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+        return delegate.entrySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/HazelcastClusterTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/HazelcastClusterTest.java b/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/HazelcastClusterTest.java
new file mode 100644
index 0000000..c53b704
--- /dev/null
+++ b/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/HazelcastClusterTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.kiwi.test.cluster;
+
+import org.apache.marmotta.kiwi.caching.CacheManagerType;
+import org.junit.BeforeClass;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class HazelcastClusterTest extends BaseClusterTest {
+
+
+    @BeforeClass
+    public static void setup() {
+        ClusterTestSupport s = new ClusterTestSupport(CacheManagerType.HAZELCAST);
+        s.setup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/SerializerTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/SerializerTest.java b/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/SerializerTest.java
new file mode 100644
index 0000000..675db87
--- /dev/null
+++ b/libraries/kiwi/kiwi-caching-hazelcast/src/test/java/org/apache/marmotta/kiwi/test/cluster/SerializerTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.kiwi.test.cluster;
+
+import com.hazelcast.config.SerializationConfig;
+import com.hazelcast.config.SerializerConfig;
+import com.hazelcast.nio.serialization.*;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.marmotta.commons.vocabulary.XSD;
+import org.apache.marmotta.kiwi.hazelcast.serializer.*;
+import org.apache.marmotta.kiwi.model.rdf.*;
+import org.apache.marmotta.kiwi.test.TestValueFactory;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDFS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Random;
+
+/**
+ * Test the different externalizer implementations we provide for Infinispan
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class SerializerTest {
+
+    private static ValueFactory valueFactory = new TestValueFactory();
+
+    private static Random rnd = new Random();
+
+    private static Logger log = LoggerFactory.getLogger(SerializerTest.class);
+
+
+    private static SerializationService simpleService, fullService;
+
+    @BeforeClass
+    public static void setup() {
+        simpleService = new SerializationServiceBuilder().build();
+
+
+        SerializationConfig config = new SerializationConfig();
+        SerializerConfig scBNode = new SerializerConfig().setImplementation(new BNodeSerializer()).setTypeClass(KiWiAnonResource.class);
+        config.addSerializerConfig(scBNode);
+
+        SerializerConfig scBoolean = new SerializerConfig().setImplementation(new BooleanLiteralSerializer()).setTypeClass(KiWiBooleanLiteral.class);
+        config.addSerializerConfig(scBoolean);
+
+        SerializerConfig scDate = new SerializerConfig().setImplementation(new DateLiteralSerializer()).setTypeClass(KiWiDateLiteral.class);
+        config.addSerializerConfig(scDate);
+
+        SerializerConfig scDouble = new SerializerConfig().setImplementation(new DoubleLiteralSerializer()).setTypeClass(KiWiDoubleLiteral.class);
+        config.addSerializerConfig(scDouble);
+
+        SerializerConfig scInt = new SerializerConfig().setImplementation(new IntLiteralSerializer()).setTypeClass(KiWiIntLiteral.class);
+        config.addSerializerConfig(scInt);
+
+        SerializerConfig scString = new SerializerConfig().setImplementation(new StringLiteralSerializer()).setTypeClass(KiWiStringLiteral.class);
+        config.addSerializerConfig(scString);
+
+        SerializerConfig scTriple = new SerializerConfig().setImplementation(new TripleSerializer()).setTypeClass(KiWiTriple.class);
+        config.addSerializerConfig(scTriple);
+
+        SerializerConfig scUri = new SerializerConfig().setImplementation(new UriSerializer()).setTypeClass(KiWiUriResource.class);
+        config.addSerializerConfig(scUri);
+
+
+        fullService   = new SerializationServiceBuilder().setConfig(config).build();
+
+
+    }
+
+
+    @Test
+    public void testUriResource() throws Exception {
+        marshall((KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8)), new UriSerializer());
+    }
+
+    @Test
+    public void testCompressedUriResource() throws Exception {
+        marshall((KiWiUriResource) valueFactory.createURI(XSD.Double.stringValue()), new UriSerializer());
+        marshall((KiWiUriResource) valueFactory.createURI(RDFS.LABEL.stringValue()), new UriSerializer());
+        marshall((KiWiUriResource) valueFactory.createURI(OWL.SAMEAS.stringValue()), new UriSerializer());
+    }
+
+
+    @Test
+    public void testBNode() throws Exception {
+        marshall((KiWiAnonResource) valueFactory.createBNode(), new BNodeSerializer());
+    }
+
+    @Test
+    public void testStringLiteral() throws Exception {
+        marshall((KiWiStringLiteral) valueFactory.createLiteral(RandomStringUtils.randomAscii(40)), new StringLiteralSerializer());
+    }
+
+    @Test
+    public void testLangLiteral() throws Exception {
+        marshall((KiWiStringLiteral) valueFactory.createLiteral(RandomStringUtils.randomAscii(40),"en"), new StringLiteralSerializer());
+    }
+
+    @Test
+    public void testTypeLiteral() throws Exception {
+        marshall((KiWiStringLiteral) valueFactory.createLiteral(RandomStringUtils.randomAscii(40),valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8))), new StringLiteralSerializer());
+    }
+
+
+    @Test
+    public void testIntLiteral() throws Exception {
+        marshall((KiWiIntLiteral) valueFactory.createLiteral(rnd.nextInt()), new IntLiteralSerializer());
+    }
+
+
+    @Test
+    public void testTriple() throws Exception {
+        KiWiUriResource s = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+        KiWiUriResource p = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+        KiWiNode o = (KiWiNode) randomNode();
+        KiWiTriple t = (KiWiTriple) valueFactory.createStatement(s,p,o);
+
+        marshall(t, new TripleSerializer());
+    }
+
+    @Test
+    public void testPrefixCompressedTriple() throws Exception {
+        KiWiUriResource s = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+        KiWiUriResource p = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+        KiWiUriResource o = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+        KiWiTriple t = (KiWiTriple) valueFactory.createStatement(s,p,o);
+
+        marshall(t, new TripleSerializer());
+    }
+
+
+    /**
+     * Run the given object through the marshaller using an in-memory stream.
+     * @param origin
+     * @param <T>
+     * @return
+     */
+    private <T> void marshall(T origin, StreamSerializer<T> externalizer) throws IOException, ClassNotFoundException, InterruptedException {
+        log.info("- testing Java ObjectStream ...");
+        ByteArrayOutputStream outBytesOS = new ByteArrayOutputStream();
+        ObjectOutputStream outOS = new ObjectOutputStream(outBytesOS);
+
+        outOS.writeObject(origin);
+
+        outOS.close();
+
+        log.info("  object {}: serialized with {} bytes", origin, outBytesOS.size());
+
+
+        log.info("- testing serializer directly ...");
+        ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
+        ObjectDataOutputStream out = new ObjectDataOutputStream(outBytes, simpleService);
+
+        externalizer.write(out, origin);
+        out.close();
+
+        log.info("  object {}: serialized with {} bytes", origin, outBytes.size());
+
+        ByteArrayInputStream inBytes = new ByteArrayInputStream(outBytes.toByteArray());
+        ObjectDataInputStream in = new ObjectDataInputStream(inBytes, simpleService);
+
+        T destination1 = externalizer.read(in);
+
+        Assert.assertEquals(origin,destination1);
+
+
+
+        log.info("- testing serializer with Hazelcast serialization service ...");
+
+
+        ByteArrayOutputStream outBytesFull = new ByteArrayOutputStream();
+        ObjectDataOutputStream outFull = new ObjectDataOutputStream(outBytesFull, fullService);
+
+        fullService.writeObject(outFull, origin);
+        outFull.close();
+
+        log.info("  object {}: serialized with {} bytes", origin, outBytesFull.size());
+
+        ByteArrayInputStream inBytesFull = new ByteArrayInputStream(outBytesFull.toByteArray());
+        ObjectDataInputStream inFull = new ObjectDataInputStream(inBytesFull, fullService);
+
+        T destination2 = (T) fullService.readObject(inFull);
+
+        Assert.assertEquals(origin, destination2);
+
+    }
+
+
+    /**
+     * Return a random RDF value, either a reused object (10% chance) or of any other kind.
+     * @return
+     */
+    protected Value randomNode() {
+        Value object;
+        switch(rnd.nextInt(6)) {
+            case 0: object = valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+                break;
+            case 1: object = valueFactory.createBNode();
+                break;
+            case 2: object = valueFactory.createLiteral(RandomStringUtils.randomAscii(40));
+                break;
+            case 3: object = valueFactory.createLiteral(rnd.nextInt());
+                break;
+            case 4: object = valueFactory.createLiteral(rnd.nextDouble());
+                break;
+            case 5: object = valueFactory.createLiteral(rnd.nextBoolean());
+                break;
+            default: object = valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+                break;
+
+        }
+        return object;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
index 5818e0b..85ed56b 100644
--- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
@@ -129,7 +129,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
                 .build();
         cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true);
 
-        log.info("initialised local cache manager");
+        log.info("initialised Infinispan local cache manager");
     }
 
     /**
@@ -182,7 +182,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
                 .build();
         cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true);
 
-        log.info("initialised distributed cache manager (cluster name: {})",  globalConfiguration.transport().clusterName());
+        log.info("initialised Infinispan distributed cache manager (cluster name: {})",  globalConfiguration.transport().clusterName());
 
     }
 
@@ -229,7 +229,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
                 .build();
         cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true);
 
-        log.info("initialised replicated cache manager (cluster name: {})",  globalConfiguration.transport().clusterName());
+        log.info("initialised Infinispan replicated cache manager (cluster name: {})",  globalConfiguration.transport().clusterName());
     }
 
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
deleted file mode 100644
index 981e595..0000000
--- a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.marmotta.kiwi.test;
-
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.marmotta.kiwi.caching.CacheManager;
-import org.apache.marmotta.kiwi.config.KiWiConfiguration;
-import org.apache.marmotta.kiwi.embedded.InfinispanEmbeddedCacheManagerFactory;
-import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
-import org.apache.marmotta.kiwi.sail.KiWiStore;
-import org.junit.*;
-import org.openrdf.model.URI;
-import org.openrdf.repository.Repository;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.repository.sail.SailRepository;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Add file description here!
- *
- * @author Sebastian Schaffert (sschaffert@apache.org)
- */
-public class ClusterTest {
-
-    private static Logger log = LoggerFactory.getLogger(ClusterTest.class);
-
-    KiWiConfiguration config1, config2;
-
-    KiWiStore store1, store2;
-
-    Repository repository1, repository2;
-
-    CacheManager cacheManager1, cacheManager2;
-
-    @Before
-    public void setup() throws RepositoryException {
-        config1 = new KiWiConfiguration(
-                "default-H2",
-                "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
-                "kiwi", "kiwi",
-                new H2Dialect());
-        config1.setDatacenterId(1);
-        config1.setClustered(true);
-        config1.setCacheManager(InfinispanEmbeddedCacheManagerFactory.class.getName());
-
-        config2 = new KiWiConfiguration(
-                "default-H2",
-                "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
-                "kiwi", "kiwi",
-                new H2Dialect());
-        config2.setDatacenterId(2);
-        config2.setClustered(true);
-        config2.setCacheManager(InfinispanEmbeddedCacheManagerFactory.class.getName());
-
-
-
-    }
-
-    public void setupCluster(int port1, int port2) throws RepositoryException {
-        config1.setClusterPort(port1);
-        config2.setClusterPort(port2);
-
-        store1 = new KiWiStore(config1);
-        store2 = new KiWiStore(config2);
-
-        repository1 = new SailRepository(store1);
-        repository2 = new SailRepository(store2);
-
-        repository1.initialize();
-        repository2.initialize();
-
-        cacheManager1 = store1.getPersistence().getCacheManager();
-        cacheManager2 = store2.getPersistence().getCacheManager();
-    }
-
-
-    @After
-    public void teardown() throws RepositoryException {
-        repository1.shutDown();
-        repository2.shutDown();
-    }
-
-
-    @Test
-    @Ignore
-    public void testClusteredCacheSync() throws InterruptedException, RepositoryException {
-        setupCluster(61222,61222);
-
-        log.info("testing cache synchronization ...");
-
-        URI u = repository1.getValueFactory().createURI("http://localhost/test1");
-
-
-        // give the cluster some time to performance asynchronous balancing
-        Thread.sleep(100);
-
-
-        log.debug("test if resource is in cache where it was created ...");
-        URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
-
-        Assert.assertNotNull(u1);
-        Assert.assertEquals(u,u1);
-
-        log.debug("test if resource has been synced to other cache in cluster ...");
-        URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
-
-        Assert.assertNotNull(u2);
-        Assert.assertEquals(u,u2);
-    }
-
-    @Test
-    @Ignore
-    public void testDisjointClusters() throws InterruptedException, RepositoryException {
-        setupCluster(61224,61225);
-
-        log.info("testing caches on different ports ...");
-
-        URI u = repository1.getValueFactory().createURI("http://localhost/test1");
-
-
-        // give the cluster some time to performance asynchronous balancing
-        Thread.sleep(100);
-
-        log.debug("test if resource is in cache where it was created ...");
-        URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
-
-        Assert.assertNotNull(u1);
-        Assert.assertEquals(u,u1);
-
-        log.debug("test if resource has been synced to other cache in cluster ...");
-        URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
-
-        Assert.assertNull(u2);
-    }
-
-
-    private static Long createCacheKey(String svalue) {
-        Hasher hasher = Hashing.goodFastHash(64).newHasher();
-        hasher.putString(svalue);
-        return hasher.hash().asLong();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/marmotta/blob/62c44dea/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/cluster/BaseClusterTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/cluster/BaseClusterTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/cluster/BaseClusterTest.java
new file mode 100644
index 0000000..4733aa9
--- /dev/null
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/cluster/BaseClusterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.kiwi.test.cluster;
+
+import org.apache.marmotta.kiwi.caching.CacheManager;
+import org.apache.marmotta.kiwi.caching.CacheManagerType;
+import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public abstract class BaseClusterTest {
+
+    private static Logger log = LoggerFactory.getLogger(BaseClusterTest.class);
+
+    private static int datacenterIds = 1;
+
+    private static Repository repositorySync1, repositorySync2, repositoryAsync1, repositoryAsync2;
+
+    private static CacheManager cacheManagerSync1, cacheManagerSync2, cacheManagerAsync1, cacheManagerAsync2;
+
+
+    @AfterClass
+    public static void teardown() throws RepositoryException {
+        repositorySync1.shutDown();
+        repositorySync2.shutDown();
+        repositoryAsync1.shutDown();
+        repositoryAsync2.shutDown();
+    }
+
+
+    @Test
+    public void testClusteredCacheSync() throws InterruptedException, RepositoryException {
+
+        log.info("testing cache synchronization ...");
+
+        URI u = repositorySync1.getValueFactory().createURI("http://localhost/test1");
+
+
+        // give the cluster some time to performance asynchronous balancing
+        Thread.sleep(100);
+
+
+        log.debug("test if resource is in cache where it was created ...");
+        URI u1 = (URI) cacheManagerSync1.getUriCache().get("http://localhost/test1");
+
+        Assert.assertNotNull(u1);
+        Assert.assertEquals(u,u1);
+
+        log.debug("test if resource has been synced to other cache in cluster ...");
+        URI u2 = (URI) cacheManagerSync2.getUriCache().get("http://localhost/test1");
+
+        Assert.assertNotNull(u2);
+        Assert.assertEquals(u,u2);
+    }
+
+    @Test
+    public void testDisjointClusters() throws InterruptedException, RepositoryException {
+
+        log.info("testing caches on different ports ...");
+
+        URI u = repositoryAsync1.getValueFactory().createURI("http://localhost/test1");
+
+
+        // give the cluster some time to performance asynchronous balancing
+        Thread.sleep(100);
+
+        log.debug("test if resource is in cache where it was created ...");
+        URI u1 = (URI) cacheManagerAsync1.getUriCache().get("http://localhost/test1");
+
+        Assert.assertNotNull(u1);
+        Assert.assertEquals(u,u1);
+
+        log.debug("test if resource has been synced to other cache in cluster ...");
+        URI u2 = (URI) cacheManagerAsync2.getUriCache().get("http://localhost/test1");
+
+        Assert.assertNull(u2);
+    }
+
+
+    protected static class ClusterTestSupport {
+
+        CacheManagerType type;
+
+        protected ClusterTestSupport(CacheManagerType type) {
+            this.type = type;
+        }
+
+        public void setup() {
+            try {
+                repositorySync1 = createConfiguration(61222);
+                repositorySync2 = createConfiguration(61222);
+                repositoryAsync1 = createConfiguration(61223);
+                repositoryAsync2 = createConfiguration(61224);
+
+                cacheManagerSync1 = getCacheManager(repositorySync1);
+                cacheManagerSync2 = getCacheManager(repositorySync2);
+                cacheManagerAsync1 = getCacheManager(repositoryAsync1);
+                cacheManagerAsync2 = getCacheManager(repositoryAsync2);
+
+
+            } catch (RepositoryException ex) {
+                Assert.fail(ex.getMessage());
+            }
+        }
+
+
+        private Repository createConfiguration(int port) throws RepositoryException {
+            KiWiConfiguration config = new KiWiConfiguration(
+                    "default-H2",
+                    "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
+                    "kiwi", "kiwi",
+                    new H2Dialect());
+            config.setDatacenterId(datacenterIds++);
+            config.setClustered(true);
+            config.setCacheManager(type);
+            config.setClusterPort(port);
+
+            KiWiStore store = new KiWiStore(config);
+
+            Repository repository = new SailRepository(store);
+            repository.initialize();
+
+            return repository;
+        }
+
+        private static CacheManager getCacheManager(Repository repository) {
+            return ((KiWiStore)((SailRepository)repository).getSail()).getPersistence().getCacheManager();
+        }
+
+    }
+}