You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2014/03/05 13:24:20 UTC
[05/21] git commit: work on externalizer
work on externalizer
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/1698c6cb
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/1698c6cb
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/1698c6cb
Branch: refs/heads/MARMOTTA-450
Commit: 1698c6cb722eb5dc0db2a382282458fe7b12b321
Parents: 2e324d8
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Mon Mar 3 17:27:19 2014 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Mon Mar 3 17:27:19 2014 +0100
----------------------------------------------------------------------
.../InfinispanEmbeddedCacheManager.java | 310 ++++++++++---------
.../kiwi/externalizer/TripleExternalizer.java | 120 +++----
.../remote/InfinispanRemoteCacheManager.java | 114 +++++++
.../apache/marmotta/kiwi/test/ClusterTest.java | 160 ++++++++++
.../test/externalizer/ExternalizerTest.java | 27 +-
.../apache/marmotta/kiwi/test/ClusterTest.java | 157 ----------
6 files changed, 521 insertions(+), 367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
index 35c2018..5818e0b 100644
--- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/embedded/InfinispanEmbeddedCacheManager.java
@@ -21,7 +21,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.marmotta.kiwi.caching.CacheManager;
import org.apache.marmotta.kiwi.config.KiWiConfiguration;
import org.apache.marmotta.kiwi.externalizer.*;
-import org.apache.marmotta.kiwi.persistence.KiWiPersistence;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.AdvancedExternalizer;
@@ -44,9 +43,6 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import static org.apache.marmotta.kiwi.config.CacheMode.DISTRIBUTED;
-import static org.apache.marmotta.kiwi.config.CacheMode.REPLICATED;
-
/**
* A class for managing the different caches that are used by the triple store.
* <p/>
@@ -68,19 +64,12 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
private EmbeddedCacheManager cacheManager;
- private GlobalConfiguration globalConfiguration;
-
// default configuration: distributed cache, 100000 entries, 300 seconds expiration, 60 seconds idle
private Configuration defaultConfiguration;
- private boolean clustered, embedded;
-
- private KiWiConfiguration kiWiConfiguration;
-
- private KiWiPersistence persistence;
+ private KiWiConfiguration config;
-
- private Cache nodeCache, tripleCache, uriCache, literalCache, bnodeCache, nsPrefixCache, nsUriCache, loaderCache, registryCache;
+ private Cache nodeCache, tripleCache, uriCache, literalCache, bnodeCache, nsPrefixCache, nsUriCache, registryCache;
/**
@@ -90,120 +79,169 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
*/
public InfinispanEmbeddedCacheManager(KiWiConfiguration config) {
- this.clustered = config.isClustered();
- this.kiWiConfiguration = config;
-
- if(clustered && (config.getCacheMode() == DISTRIBUTED || config.getCacheMode() == REPLICATED)) {
- try {
- String jgroupsXml = IOUtils.toString(InfinispanEmbeddedCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml"));
-
- jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress()));
- jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort()));
-
-
- globalConfiguration = new GlobalConfigurationBuilder()
- .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
- .transport()
- .defaultTransport()
- .clusterName(config.getClusterName())
- .machineId("instance-" + config.getDatacenterId())
- .addProperty("configurationXml", jgroupsXml)
- .globalJmxStatistics()
- .jmxDomain("org.apache.marmotta.kiwi")
- .allowDuplicateDomains(true)
- .serialization()
- .addAdvancedExternalizer(getExternalizers())
- .build();
- } catch (IOException ex) {
- log.warn("error loading JGroups configuration from archive: {}", ex.getMessage());
- log.warn("some configuration options will not be available");
-
- globalConfiguration = new GlobalConfigurationBuilder()
- .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
- .transport()
- .defaultTransport()
- .clusterName(config.getClusterName())
- .machineId("instance-" + config.getDatacenterId())
- .addProperty("configurationFile", "jgroups-kiwi.xml")
- .globalJmxStatistics()
- .jmxDomain("org.apache.marmotta.kiwi")
- .allowDuplicateDomains(true)
- .serialization()
- .addAdvancedExternalizer(getExternalizers())
- .build();
- }
+ this.config = config;
- if(config.getCacheMode() == DISTRIBUTED) {
- defaultConfiguration = new ConfigurationBuilder()
- .clustering()
- .cacheMode(CacheMode.DIST_ASYNC)
- .async()
- .asyncMarshalling()
- .l1()
- .lifespan(5, TimeUnit.MINUTES)
- .hash()
- .numOwners(2)
- .numSegments(40)
- .consistentHashFactory(new SyncConsistentHashFactory())
- .stateTransfer()
- .fetchInMemoryState(false)
- .eviction()
- .strategy(EvictionStrategy.LIRS)
- .maxEntries(100000)
- .expiration()
- .lifespan(30, TimeUnit.MINUTES)
- .maxIdle(10, TimeUnit.MINUTES)
- .build();
- } else {
- defaultConfiguration = new ConfigurationBuilder()
- .clustering()
- .cacheMode(CacheMode.REPL_ASYNC)
- .async()
- .asyncMarshalling()
- .stateTransfer()
- .fetchInMemoryState(false)
- .eviction()
- .strategy(EvictionStrategy.LIRS)
- .maxEntries(100000)
- .expiration()
- .lifespan(30, TimeUnit.MINUTES)
- .maxIdle(10, TimeUnit.MINUTES)
- .build();
+ try {
+ switch (config.getCacheMode()) {
+ case DISTRIBUTED:
+ buildDistributedConfiguration(getExternalizers());
+ break;
+ case REPLICATED:
+ buildReplicatedConfiguration(getExternalizers());
+ break;
+ case LOCAL:
+ buildLocalConfiguration();
+ break;
}
- } else {
- globalConfiguration = new GlobalConfigurationBuilder()
- .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
- .globalJmxStatistics()
- .jmxDomain("org.apache.marmotta.kiwi")
- .allowDuplicateDomains(true)
- .build();
+ } catch (IOException ex) {
+ log.warn("error while building cache cluster configuration, reverting to local cache");
+ buildLocalConfiguration();
+ }
- defaultConfiguration = new ConfigurationBuilder()
- .clustering()
- .cacheMode(CacheMode.LOCAL)
- .eviction()
- .strategy(EvictionStrategy.LIRS)
- .maxEntries(100000)
- .expiration()
+
+ }
+
+ /**
+ * Build a local cache configuration.
+ * <p/>
+ * In local cache mode, the cache is not shared among the servers in a cluster. Each machine keeps a local cache.
+ * This allows quick startups and eliminates network traffic in the cluster, but subsequent requests to different
+ * cluster members cannot benefit from the cached data.
+ */
+ protected void buildLocalConfiguration() {
+ GlobalConfiguration globalConfiguration = new GlobalConfigurationBuilder()
+ .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
+ .globalJmxStatistics()
+ .jmxDomain("org.apache.marmotta.kiwi")
+ .allowDuplicateDomains(true)
+ .build();
+
+ defaultConfiguration = new ConfigurationBuilder()
+ .clustering()
+ .cacheMode(CacheMode.LOCAL)
+ .eviction()
+ .strategy(EvictionStrategy.LIRS)
+ .maxEntries(100000)
+ .expiration()
+ .lifespan(5, TimeUnit.MINUTES)
+ .maxIdle(1, TimeUnit.MINUTES)
+ .build();
+ cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true);
+
+ log.info("initialised local cache manager");
+ }
+
+ /**
+ * Build a distributed cache configuration.
+ * <p/>
+ * In distributed cache mode, the cluster forms a big hash table used as a cache. This allows to make efficient
+ * use of the large amount of memory available, but requires cache rebalancing and a lot of network transfers,
+ * especially in case cluster members are restarted often.
+ */
+ protected void buildDistributedConfiguration(AdvancedExternalizer...externalizers) throws IOException {
+ String jgroupsXml = IOUtils.toString(InfinispanEmbeddedCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml"));
+
+ jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress()));
+ jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort()));
+
+
+ GlobalConfiguration globalConfiguration = new GlobalConfigurationBuilder()
+ .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
+ .transport()
+ .defaultTransport()
+ .clusterName(config.getClusterName())
+ .machineId("instance-" + config.getDatacenterId())
+ .addProperty("configurationXml", jgroupsXml)
+ .globalJmxStatistics()
+ .jmxDomain("org.apache.marmotta.kiwi")
+ .allowDuplicateDomains(true)
+ .serialization()
+ .addAdvancedExternalizer(externalizers)
+ .build();
+
+ defaultConfiguration = new ConfigurationBuilder()
+ .clustering()
+ .cacheMode(CacheMode.DIST_ASYNC)
+ .async()
+ .asyncMarshalling()
+ .l1()
.lifespan(5, TimeUnit.MINUTES)
- .maxIdle(1, TimeUnit.MINUTES)
- .build();
+ .hash()
+ .numOwners(2)
+ .numSegments(40)
+ .consistentHashFactory(new SyncConsistentHashFactory())
+ .stateTransfer()
+ .fetchInMemoryState(false)
+ .eviction()
+ .strategy(EvictionStrategy.LIRS)
+ .maxEntries(100000)
+ .expiration()
+ .lifespan(30, TimeUnit.MINUTES)
+ .maxIdle(10, TimeUnit.MINUTES)
+ .build();
+ cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true);
- }
+ log.info("initialised distributed cache manager (cluster name: {})", globalConfiguration.transport().clusterName());
+ }
+ /**
+ * Build a replicated cache configuration.
+ * <p/>
+ * In replicated cache mode, each node in the cluster has an identical copy of all cache data. This allows
+ * very efficient cache lookups and reduces the rebalancing effort, but requires more memory.
+ */
+ protected void buildReplicatedConfiguration(AdvancedExternalizer...externalizers) throws IOException {
+ String jgroupsXml = IOUtils.toString(InfinispanEmbeddedCacheManager.class.getResourceAsStream("/jgroups-kiwi.xml"));
+
+ jgroupsXml = jgroupsXml.replaceAll("mcast_addr=\"[0-9.]+\"", String.format("mcast_addr=\"%s\"", config.getClusterAddress()));
+ jgroupsXml = jgroupsXml.replaceAll("mcast_port=\"[0-9]+\"", String.format("mcast_port=\"%d\"", config.getClusterPort()));
+
+
+ GlobalConfiguration globalConfiguration = new GlobalConfigurationBuilder()
+ .classLoader(InfinispanEmbeddedCacheManager.class.getClassLoader())
+ .transport()
+ .defaultTransport()
+ .clusterName(config.getClusterName())
+ .machineId("instance-" + config.getDatacenterId())
+ .addProperty("configurationXml", jgroupsXml)
+ .globalJmxStatistics()
+ .jmxDomain("org.apache.marmotta.kiwi")
+ .allowDuplicateDomains(true)
+ .serialization()
+ .addAdvancedExternalizer(externalizers)
+ .build();
+
+ defaultConfiguration = new ConfigurationBuilder()
+ .clustering()
+ .cacheMode(CacheMode.REPL_ASYNC)
+ .async()
+ .asyncMarshalling()
+ .stateTransfer()
+ .fetchInMemoryState(false)
+ .eviction()
+ .strategy(EvictionStrategy.LIRS)
+ .maxEntries(100000)
+ .expiration()
+ .lifespan(30, TimeUnit.MINUTES)
+ .maxIdle(10, TimeUnit.MINUTES)
+ .build();
cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true);
- log.info("initialised Infinispan cache manager ({})", globalConfiguration.isClustered() ? "cluster name: "+globalConfiguration.transport().clusterName() : "single host");
+ log.info("initialised replicated cache manager (cluster name: {})", globalConfiguration.transport().clusterName());
+ }
+
- this.embedded = true;
+ protected boolean isClustered() {
+ return config.getCacheMode() == org.apache.marmotta.kiwi.config.CacheMode.DISTRIBUTED ||
+ config.getCacheMode() == org.apache.marmotta.kiwi.config.CacheMode.REPLICATED;
}
private AdvancedExternalizer[] getExternalizers() {
return new AdvancedExternalizer[] {
- new TripleExternalizer(persistence),
+ new TripleExternalizer(),
new UriExternalizer(),
new BNodeExternalizer(),
new StringLiteralExternalizer(),
@@ -249,7 +287,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
.clustering()
.cacheMode(CacheMode.LOCAL)
.eviction()
- .maxEntries(kiWiConfiguration.getTripleCacheSize())
+ .maxEntries(config.getTripleCacheSize())
.expiration()
.lifespan(60, TimeUnit.MINUTES)
.maxIdle(30, TimeUnit.MINUTES)
@@ -272,7 +310,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
if(uriCache == null) {
Configuration uriConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
.eviction()
- .maxEntries(kiWiConfiguration.getUriCacheSize())
+ .maxEntries(config.getUriCacheSize())
.build();
cacheManager.defineConfiguration(URI_CACHE, uriConfiguration);
@@ -292,7 +330,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
if(bnodeCache == null) {
Configuration bnodeConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
.eviction()
- .maxEntries(kiWiConfiguration.getBNodeCacheSize())
+ .maxEntries(config.getBNodeCacheSize())
.build();
cacheManager.defineConfiguration(BNODE_CACHE, bnodeConfiguration);
@@ -312,7 +350,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
if(literalCache == null) {
Configuration literalConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
.eviction()
- .maxEntries(kiWiConfiguration.getLiteralCacheSize())
+ .maxEntries(config.getLiteralCacheSize())
.build();
cacheManager.defineConfiguration(LITERAL_CACHE, literalConfiguration);
@@ -328,12 +366,12 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
*/
public Cache getNamespaceUriCache() {
if(nsUriCache == null) {
- if(clustered) {
+ if(isClustered()) {
Configuration nsuriConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
.clustering()
.cacheMode(CacheMode.REPL_ASYNC)
.eviction()
- .maxEntries(kiWiConfiguration.getNamespaceCacheSize())
+ .maxEntries(config.getNamespaceCacheSize())
.expiration()
.lifespan(1, TimeUnit.DAYS)
.build();
@@ -341,7 +379,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
} else {
Configuration nsuriConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
.eviction()
- .maxEntries(kiWiConfiguration.getNamespaceCacheSize())
+ .maxEntries(config.getNamespaceCacheSize())
.expiration()
.lifespan(1, TimeUnit.HOURS)
.build();
@@ -359,12 +397,12 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
*/
public Cache getNamespacePrefixCache() {
if(nsPrefixCache == null) {
- if(clustered) {
+ if(isClustered()) {
Configuration nsprefixConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
.clustering()
.cacheMode(CacheMode.REPL_ASYNC)
.eviction()
- .maxEntries(kiWiConfiguration.getNamespaceCacheSize())
+ .maxEntries(config.getNamespaceCacheSize())
.expiration()
.lifespan(1, TimeUnit.DAYS)
.build();
@@ -373,7 +411,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
} else {
Configuration nsprefixConfiguration = new ConfigurationBuilder().read(defaultConfiguration)
.eviction()
- .maxEntries(kiWiConfiguration.getNamespaceCacheSize())
+ .maxEntries(config.getNamespaceCacheSize())
.expiration()
.lifespan(1, TimeUnit.HOURS)
.build();
@@ -395,7 +433,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
*/
public Cache getRegistryCache() {
if(registryCache == null) {
- if(clustered) {
+ if(isClustered()) {
Configuration registryConfiguration = new ConfigurationBuilder()
.clustering()
.cacheMode(CacheMode.REPL_SYNC)
@@ -436,31 +474,6 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
}
/**
- * Return the Infinispan cache manager used by the caching infrastructure.
- *
- * @return
- */
- public EmbeddedCacheManager getCacheManager() {
- return cacheManager;
- }
-
- /**
- * Return the global cache manager configuration used by the caching infrastructure.
- * @return
- */
- public GlobalConfiguration getGlobalConfiguration() {
- return globalConfiguration;
- }
-
- /**
- * Return the default cache configuration used by the caching infrastructure.
- * @return
- */
- public Configuration getDefaultConfiguration() {
- return defaultConfiguration;
- }
-
- /**
* Clear all caches managed by this cache manager.
*/
public void clear() {
@@ -479,7 +492,6 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
bnodeCache = null;
nsPrefixCache = null;
nsUriCache = null;
- loaderCache = null;
registryCache = null;
}
@@ -488,7 +500,7 @@ public class InfinispanEmbeddedCacheManager implements CacheManager {
*/
public void shutdown() {
try {
- if(embedded && cacheManager.getStatus() == ComponentStatus.RUNNING) {
+ if(cacheManager.getStatus() == ComponentStatus.RUNNING) {
log.warn("shutting down cache manager ...");
// if(cacheManager.getTransport() != null) {
// log.info("... shutting down transport ...");
http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java
index e4c584c..05e6933 100644
--- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/externalizer/TripleExternalizer.java
@@ -17,21 +17,17 @@
package org.apache.marmotta.kiwi.externalizer;
+import org.apache.commons.lang3.StringUtils;
import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
import org.apache.marmotta.kiwi.model.rdf.KiWiResource;
import org.apache.marmotta.kiwi.model.rdf.KiWiTriple;
import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
-import org.apache.marmotta.kiwi.persistence.KiWiConnection;
-import org.apache.marmotta.kiwi.persistence.KiWiPersistence;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.sql.SQLException;
import java.util.Date;
import java.util.Set;
@@ -41,13 +37,10 @@ import java.util.Set;
*/
public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> {
- private static Logger log = LoggerFactory.getLogger(TripleExternalizer.class);
- private KiWiPersistence persistence;
+ public static final int MODE_DEFAULT = 1;
+ public static final int MODE_PREFIX = 2;
- public TripleExternalizer(KiWiPersistence persistence) {
- this.persistence = persistence;
- }
@Override
public Set<Class<? extends KiWiTriple>> getTypeClasses() {
@@ -62,11 +55,39 @@ public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> {
@Override
public void writeObject(ObjectOutput output, KiWiTriple object) throws IOException {
output.writeLong(object.getId());
- output.writeLong(object.getSubject().getId());
- output.writeLong(object.getPredicate().getId());
- output.writeLong(object.getObject().getId());
- output.writeLong(object.getContext() != null ? object.getContext().getId() : -1L);
- output.writeLong(object.getCreator() != null ? object.getCreator().getId() : -1L);
+
+ // in case subject and object are both uris we use a special prefix-compressed mode
+ if(object.getSubject().isUriResource() && object.getObject().isUriResource()) {
+ String sUri = object.getSubject().stringValue();
+ String oUri = object.getObject().stringValue();
+
+ String prefix = StringUtils.getCommonPrefix(sUri,oUri);
+
+ output.writeByte(MODE_PREFIX);
+ output.writeInt(prefix.length());
+ output.writeChars(prefix);
+
+ output.writeLong(object.getSubject().getId());
+ output.writeInt(sUri.length() - prefix.length());
+ output.writeChars(sUri.substring(prefix.length()));
+ output.writeLong(object.getSubject().getCreated().getTime());
+
+ output.writeObject(object.getPredicate());
+
+ output.writeLong(object.getObject().getId());
+ output.writeInt(oUri.length() - prefix.length());
+ output.writeChars(oUri.substring(prefix.length()));
+ output.writeLong(object.getObject().getCreated().getTime());
+ } else {
+ output.writeByte(MODE_DEFAULT);
+
+ output.writeObject(object.getSubject());
+ output.writeObject(object.getPredicate());
+ output.writeObject(object.getObject());
+ }
+
+ output.writeObject(object.getContext());
+ output.writeObject(object.getCreator());
output.writeBoolean(object.isDeleted());
output.writeBoolean(object.isInferred());
output.writeBoolean(object.isNewTriple());
@@ -80,48 +101,33 @@ public class TripleExternalizer implements AdvancedExternalizer<KiWiTriple> {
@Override
public KiWiTriple readObject(ObjectInput input) throws IOException, ClassNotFoundException {
- try {
- KiWiConnection con = persistence.getConnection();
- try {
- KiWiTriple result = new KiWiTriple();
- result.setId(input.readLong());
-
- long[] nodeIds = new long[5];
- for(int i=0; i<5; i++) {
- nodeIds[0] = input.readLong();
- }
- KiWiNode[] nodes = con.loadNodesByIds(nodeIds);
-
- result.setSubject((KiWiResource) nodes[0]);
- result.setPredicate((KiWiUriResource) nodes[1]);
- result.setObject(nodes[2]);
-
- if(nodes[3] != null) {
- result.setContext((KiWiResource) nodes[3]);
- }
- if(nodes[4] != null) {
- result.setCreator((KiWiResource) nodes[4]);
- }
-
- result.setDeleted(input.readBoolean());
- result.setInferred(input.readBoolean());
- result.setNewTriple(input.readBoolean());
-
- result.setCreated(new Date(input.readLong()));
-
- long deletedAt = input.readLong();
- if(deletedAt > 0) {
- result.setDeletedAt(new Date(deletedAt));
- }
-
-
- return result;
- } finally {
- con.commit();
- con.close();
- }
- } catch (SQLException ex) {
- throw new IOException(ex);
+
+ KiWiTriple result = new KiWiTriple();
+ result.setId(input.readLong());
+
+ int mode = input.readInt();
+ if(mode == MODE_PREFIX) {
+ String prefix =
+ }
+
+
+ result.setSubject((KiWiResource) input.readObject());
+ result.setPredicate((KiWiUriResource) input.readObject());
+ result.setObject((KiWiNode) input.readObject());
+ result.setContext((KiWiResource) input.readObject());
+ result.setCreator((KiWiResource) input.readObject());
+ result.setDeleted(input.readBoolean());
+ result.setInferred(input.readBoolean());
+ result.setNewTriple(input.readBoolean());
+
+ result.setCreated(new Date(input.readLong()));
+
+ long deletedAt = input.readLong();
+ if(deletedAt > 0) {
+ result.setDeletedAt(new Date(deletedAt));
}
+
+
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java
index c40feb8..6f5034a 100644
--- a/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/main/java/org/apache/marmotta/kiwi/remote/InfinispanRemoteCacheManager.java
@@ -19,6 +19,9 @@ package org.apache.marmotta.kiwi.remote;
import org.apache.marmotta.kiwi.caching.CacheManager;
import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.model.rdf.*;
+
+import java.util.Map;
/**
* Implementation of an Infinispan cache manager with a remote (client-server) cache.
@@ -34,7 +37,118 @@ public class InfinispanRemoteCacheManager implements CacheManager {
}
+ /**
+ * Return the node id -> node cache from the cache manager. This cache is heavily used to lookup
+ * nodes when querying or loading triples and should therefore have a decent size (default 500.000 elements).
+ *
+ * @return an EHCache Cache instance containing the node id -> node mappings
+ */
+ @Override
+ public Map<Long, KiWiNode> getNodeCache() {
+ return null;
+ }
+
+ /**
+ * Return the triple id -> triple cache from the cache manager. This cache is used for speeding up the
+ * construction of query results.
+ *
+ * @return
+ */
+ @Override
+ public Map<Long, KiWiTriple> getTripleCache() {
+ return null;
+ }
+ /**
+ * Return the uri -> KiWiUriResource cache from the cache manager. This cache is used when constructing new
+ * KiWiUriResources to avoid a database lookup.
+ *
+ * @return
+ */
+ @Override
+ public Map<String, KiWiUriResource> getUriCache() {
+ return null;
+ }
+ /**
+ * Return the anonId -> KiWiAnonResource cache from the cache manager. This cache is used when constructing new
+ * KiWiAnonResources to avoid a database lookup.
+ *
+ * @return
+ */
+ @Override
+ public Map<String, KiWiAnonResource> getBNodeCache() {
+ return null;
+ }
+ /**
+ * Return the literal cache key -> KiWiLiteral cache from the cache manager. This cache is used when constructing new
+ * KiWiLiterals to avoid a database lookup.
+ *
+ * @return
+ * @see org.apache.marmotta.commons.sesame.model.LiteralCommons#createCacheKey(String, java.util.Locale, String)
+ */
+ @Override
+ public Map<String, KiWiLiteral> getLiteralCache() {
+ return null;
+ }
+
+ /**
+ * Return the URI -> namespace cache from the cache manager. Used for looking up namespaces
+ *
+ * @return
+ */
+ @Override
+ public Map<String, KiWiNamespace> getNamespaceUriCache() {
+ return null;
+ }
+
+ /**
+ * Return the prefix -> namespace cache from the cache manager. Used for looking up namespaces
+ *
+ * @return
+ */
+ @Override
+ public Map<String, KiWiNamespace> getNamespacePrefixCache() {
+ return null;
+ }
+
+ /**
+ * Create and return the cache used by the CacheTripleRegistry. This is an unlimited synchronous replicated
+ * cache and should be used with care.
+ *
+ * @return
+ */
+ @Override
+ public Map<Long, Long> getRegistryCache() {
+ return null;
+ }
+
+ /**
+ * Get the cache with the given name from the cache manager. Can be used to request additional
+ * caches from the cache manager that are not covered by explicit methods.
+ *
+ * @param name
+ * @return
+ */
+ @Override
+ public Map getCacheByName(String name) {
+ return null;
+ }
+
+ /**
+ * Clear all caches managed by this cache manager.
+ */
+ @Override
+ public void clear() {
+
+ }
+
+ /**
+ * Shutdown this cache manager instance. Will shutdown the underlying EHCache cache manager.
+ */
+ @Override
+ public void shutdown() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
new file mode 100644
index 0000000..0b6823c
--- /dev/null
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.kiwi.test;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.marmotta.kiwi.caching.CacheManager;
+import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.embedded.InfinispanEmbeddedCacheManagerFactory;
+import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.junit.*;
+import org.openrdf.model.URI;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class ClusterTest {
+
+ private static Logger log = LoggerFactory.getLogger(ClusterTest.class);
+
+ KiWiConfiguration config1, config2;
+
+ KiWiStore store1, store2;
+
+ Repository repository1, repository2;
+
+ CacheManager cacheManager1, cacheManager2;
+
+ @Before
+ public void setup() throws RepositoryException {
+ config1 = new KiWiConfiguration(
+ "default-H2",
+ "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
+ "kiwi", "kiwi",
+ new H2Dialect());
+ config1.setDatacenterId(1);
+ config1.setClustered(true);
+ config1.setCacheManagerFactory(InfinispanEmbeddedCacheManagerFactory.class.getName());
+
+ config2 = new KiWiConfiguration(
+ "default-H2",
+ "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
+ "kiwi", "kiwi",
+ new H2Dialect());
+ config2.setDatacenterId(2);
+ config2.setClustered(true);
+ config2.setCacheManagerFactory(InfinispanEmbeddedCacheManagerFactory.class.getName());
+
+
+
+ }
+
+ public void setupCluster(int port1, int port2) throws RepositoryException {
+ config1.setClusterPort(port1);
+ config2.setClusterPort(port2);
+
+ store1 = new KiWiStore(config1);
+ store2 = new KiWiStore(config2);
+
+ repository1 = new SailRepository(store1);
+ repository2 = new SailRepository(store2);
+
+ repository1.initialize();
+ repository2.initialize();
+
+ cacheManager1 = store1.getPersistence().getCacheManager();
+ cacheManager2 = store2.getPersistence().getCacheManager();
+ }
+
+
+ @After
+ public void teardown() throws RepositoryException {
+ repository1.shutDown();
+ repository2.shutDown();
+ }
+
+
+ @Test
+ @Ignore
+ public void testClusteredCacheSync() throws InterruptedException, RepositoryException {
+ setupCluster(61222,61222);
+
+ log.info("testing cache synchronization ...");
+
+ URI u = repository1.getValueFactory().createURI("http://localhost/test1");
+
+
+ // give the cluster some time to performance asynchronous balancing
+ Thread.sleep(100);
+
+
+ log.debug("test if resource is in cache where it was created ...");
+ URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
+
+ Assert.assertNotNull(u1);
+ Assert.assertEquals(u,u1);
+
+ log.debug("test if resource has been synced to other cache in cluster ...");
+ URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
+
+ Assert.assertNotNull(u2);
+ Assert.assertEquals(u,u2);
+ }
+
+ @Test
+ @Ignore
+ public void testDisjointClusters() throws InterruptedException, RepositoryException {
+ setupCluster(61224,61225);
+
+ log.info("testing caches on different ports ...");
+
+ URI u = repository1.getValueFactory().createURI("http://localhost/test1");
+
+
+ // give the cluster some time to performance asynchronous balancing
+ Thread.sleep(100);
+
+ log.debug("test if resource is in cache where it was created ...");
+ URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
+
+ Assert.assertNotNull(u1);
+ Assert.assertEquals(u,u1);
+
+ log.debug("test if resource has been synced to other cache in cluster ...");
+ URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
+
+ Assert.assertNull(u2);
+ }
+
+
+ private static Long createCacheKey(String svalue) {
+ Hasher hasher = Hashing.goodFastHash(64).newHasher();
+ hasher.putString(svalue);
+ return hasher.hash().asLong();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java
index fe7b701..9f52813 100644
--- a/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java
+++ b/libraries/kiwi/kiwi-caching-infinispan/src/test/java/org/apache/marmotta/kiwi/test/externalizer/ExternalizerTest.java
@@ -18,10 +18,8 @@
package org.apache.marmotta.kiwi.test.externalizer;
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
-import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.apache.marmotta.kiwi.externalizer.*;
+import org.apache.marmotta.kiwi.model.rdf.*;
import org.apache.marmotta.kiwi.test.TestValueFactory;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.StreamingMarshaller;
@@ -123,6 +121,16 @@ public class ExternalizerTest {
}
+ @Test
+ public void testTriple() throws Exception {
+ KiWiUriResource s = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+ KiWiUriResource p = (KiWiUriResource) valueFactory.createURI("http://localhost/" + RandomStringUtils.randomAlphanumeric(8));
+ KiWiNode o = (KiWiNode) randomNode();
+ KiWiTriple t = (KiWiTriple) valueFactory.createStatement(s,p,o);
+
+ marshall(t, new TripleExternalizer());
+ }
+
/**
* Run the given object through the marshaller using an in-memory stream.
* @param origin
@@ -130,6 +138,17 @@ public class ExternalizerTest {
* @return
*/
private <T> void marshall(T origin, AdvancedExternalizer<T> externalizer) throws IOException, ClassNotFoundException, InterruptedException {
+ log.info("- testing Java ObjectStream ...");
+ ByteArrayOutputStream outBytesOS = new ByteArrayOutputStream();
+ ObjectOutputStream outOS = new ObjectOutputStream(outBytesOS);
+
+ outOS.writeObject(origin);
+
+ outOS.close();
+
+ log.info(" object {}: serialized with {} bytes", origin, outBytesOS.size());
+
+
log.info("- testing externalizer directly ...");
ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(outBytes);
http://git-wip-us.apache.org/repos/asf/marmotta/blob/1698c6cb/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
deleted file mode 100644
index 5f4d719..0000000
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/ClusterTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.marmotta.kiwi.test;
-
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.marmotta.kiwi.caching.KiWiCacheManager;
-import org.apache.marmotta.kiwi.config.KiWiConfiguration;
-import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
-import org.apache.marmotta.kiwi.sail.KiWiStore;
-import org.junit.*;
-import org.openrdf.model.URI;
-import org.openrdf.repository.Repository;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.repository.sail.SailRepository;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Add file description here!
- *
- * @author Sebastian Schaffert (sschaffert@apache.org)
- */
-public class ClusterTest {
-
- private static Logger log = LoggerFactory.getLogger(ClusterTest.class);
-
- KiWiConfiguration config1, config2;
-
- KiWiStore store1, store2;
-
- Repository repository1, repository2;
-
- KiWiCacheManager cacheManager1, cacheManager2;
-
- @Before
- public void setup() throws RepositoryException {
- config1 = new KiWiConfiguration(
- "default-H2",
- "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
- "kiwi", "kiwi",
- new H2Dialect());
- config1.setDatacenterId(1);
- config1.setClustered(true);
-
- config2 = new KiWiConfiguration(
- "default-H2",
- "jdbc:h2:mem:kiwitest;MVCC=true;DB_CLOSE_ON_EXIT=TRUE;DB_CLOSE_DELAY=-1",
- "kiwi", "kiwi",
- new H2Dialect());
- config2.setDatacenterId(2);
- config2.setClustered(true);
-
-
-
- }
-
- public void setupCluster(int port1, int port2) throws RepositoryException {
- config1.setClusterPort(port1);
- config2.setClusterPort(port2);
-
- store1 = new KiWiStore(config1);
- store2 = new KiWiStore(config2);
-
- repository1 = new SailRepository(store1);
- repository2 = new SailRepository(store2);
-
- repository1.initialize();
- repository2.initialize();
-
- cacheManager1 = store1.getPersistence().getCacheManager();
- cacheManager2 = store2.getPersistence().getCacheManager();
- }
-
-
- @After
- public void teardown() throws RepositoryException {
- repository1.shutDown();
- repository2.shutDown();
- }
-
-
- @Test
- @Ignore
- public void testClusteredCacheSync() throws InterruptedException, RepositoryException {
- setupCluster(61222,61222);
-
- log.info("testing cache synchronization ...");
-
- URI u = repository1.getValueFactory().createURI("http://localhost/test1");
-
-
- // give the cluster some time to performance asynchronous balancing
- Thread.sleep(100);
-
-
- log.debug("test if resource is in cache where it was created ...");
- URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
-
- Assert.assertNotNull(u1);
- Assert.assertEquals(u,u1);
-
- log.debug("test if resource has been synced to other cache in cluster ...");
- URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
-
- Assert.assertNotNull(u2);
- Assert.assertEquals(u,u2);
- }
-
- @Test
- @Ignore
- public void testDisjointClusters() throws InterruptedException, RepositoryException {
- setupCluster(61224,61225);
-
- log.info("testing caches on different ports ...");
-
- URI u = repository1.getValueFactory().createURI("http://localhost/test1");
-
-
- // give the cluster some time to performance asynchronous balancing
- Thread.sleep(100);
-
- log.debug("test if resource is in cache where it was created ...");
- URI u1 = (URI) cacheManager1.getUriCache().get(createCacheKey("http://localhost/test1"));
-
- Assert.assertNotNull(u1);
- Assert.assertEquals(u,u1);
-
- log.debug("test if resource has been synced to other cache in cluster ...");
- URI u2 = (URI) cacheManager2.getUriCache().get(createCacheKey("http://localhost/test1"));
-
- Assert.assertNull(u2);
- }
-
-
- private static Long createCacheKey(String svalue) {
- Hasher hasher = Hashing.goodFastHash(64).newHasher();
- hasher.putString(svalue);
- return hasher.hash().asLong();
- }
-
-}