You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/06/21 17:11:41 UTC
[2/5] incubator-rya git commit: Included project rya.reasoning.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java
new file mode 100644
index 0000000..714ad1c
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java
@@ -0,0 +1,278 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.reasoning.Derivation;
+import mvm.rya.reasoning.LocalReasoner;
+import mvm.rya.reasoning.LocalReasoner.Relevance;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.openrdf.model.Resource;
+
+public class ForwardChain extends AbstractReasoningTool {
+ @Override
+ protected void configureReasoningJob(String[] args) throws Exception {
+ distributeSchema();
+ Configuration conf = job.getConfiguration();
+ // We can ignore irrelevant triples, unless the schema has just changed
+ // and therefore we can't rely on previous determinations of relevance.
+ configureMultipleInput(TableMapper.class, RdfMapper.class,
+ FileMapper.class, !MRReasoningUtils.isSchemaNew(conf));
+ job.setMapOutputKeyClass(ResourceWritable.class);
+ job.setMapOutputValueClass(Fact.class);
+ job.setReducerClass(ReasoningReducer.class);
+ job.setSortComparatorClass(ResourceWritable.SecondaryComparator.class);
+ job.setGroupingComparatorClass(ResourceWritable.PrimaryComparator.class);
+ configureDerivationOutput(true);
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new ForwardChain(), args));
+ }
+
+ /**
+ * Decide whether to output facts and with what keys. Subclasses handle
+ * different sources of input.
+ */
+ public static class ForwardChainMapper<K, V> extends Mapper<K, V,
+ ResourceWritable, Fact> {
+ protected Schema schema;
+ protected ResourceWritable node = new ResourceWritable();
+ protected MultipleOutputs<?, ?> debugOut;
+ protected boolean debug;
+ private Text debugKey = new Text();
+ private Text debugValue = new Text();
+ public ForwardChainMapper(Schema s) {
+ this.schema = s;
+ }
+ public ForwardChainMapper() {}
+
+ @Override
+ protected void setup(Context context) {
+ debugOut = new MultipleOutputs<>(context);
+ Configuration conf = context.getConfiguration();
+ if (schema == null) {
+ schema = MRReasoningUtils.loadSchema(context.getConfiguration());
+ }
+ debug = MRReasoningUtils.debug(conf);
+ }
+ @Override
+ public void cleanup(Context context) throws IOException,
+ InterruptedException {
+ if (debugOut != null) {
+ debugOut.close();
+ }
+ }
+
+ protected void process(Context context, Fact inputTriple)
+ throws IOException, InterruptedException {
+ Relevance rel = LocalReasoner.relevantFact(inputTriple, schema);
+ if (rel.subject()) {
+ node.set(inputTriple.getSubject(), 1);
+ context.write(node, inputTriple);
+ if (debug) {
+ int i = inputTriple.getIteration();
+ debugKey.set("MAP_OUT" + node.toString());
+ debugValue.set(inputTriple.explain(false) + "[" + i + "]");
+ debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey,
+ debugValue);
+ }
+ }
+ if (rel.object()) {
+ node.set((Resource) inputTriple.getObject(), -1);
+ context.write(node, inputTriple);
+ if (debug) {
+ int i = inputTriple.getIteration();
+ debugKey.set("MAP_OUT" + node.toString());
+ debugValue.set(inputTriple.explain(false) + "[" + i + "]");
+ debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey,
+ debugValue);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get input data from the database
+ */
+ public static class TableMapper extends ForwardChainMapper<Key, Value> {
+ private Fact inputTriple = new Fact();
+ public TableMapper() { super(); }
+ public TableMapper(Schema s) { super(s); }
+ @Override
+ public void map(Key row, Value data, Context context)
+ throws IOException, InterruptedException {
+ inputTriple.setTriple(MRReasoningUtils.getStatement(row, data,
+ context.getConfiguration()));
+ process(context, inputTriple);
+ }
+ }
+
+ /**
+ * Get intermediate data from a sequence file
+ */
+ public static class FileMapper extends ForwardChainMapper<Fact,
+ NullWritable> {
+ public FileMapper() { super(); }
+ public FileMapper(Schema s) { super(s); }
+ @Override
+ public void map(Fact inputTriple, NullWritable nw,
+ Context context) throws IOException, InterruptedException {
+ process(context, inputTriple);
+ }
+ }
+
+ /**
+ * Get input data from an RDF file
+ */
+ public static class RdfMapper extends ForwardChainMapper<LongWritable,
+ RyaStatementWritable> {
+ private Fact inputTriple = new Fact();
+ public RdfMapper() { super(); }
+ public RdfMapper(Schema s) { super(s); }
+ @Override
+ public void map(LongWritable key, RyaStatementWritable rsw,
+ Context context) throws IOException, InterruptedException {
+ inputTriple.setTriple(rsw.getRyaStatement());
+ process(context, inputTriple);
+ }
+ }
+
+ public static class ReasoningReducer extends Reducer<ResourceWritable,
+ Fact, Fact, NullWritable> {
+ private static final int LOG_INTERVAL = 5000;
+ private Logger log = Logger.getLogger(ReasoningReducer.class);
+ private MultipleOutputs<?, ?> mout;
+ private Schema schema;
+ private boolean debug;
+ private Text debugK = new Text();
+ private Text debugV = new Text();
+ private int maxStored = 0;
+ private String maxNode = "";
+ public ReasoningReducer(Schema s) {
+ this.schema = s;
+ }
+ public ReasoningReducer() {}
+ @Override
+ public void setup(Context context) {
+ mout = new MultipleOutputs<>(context);
+ Configuration conf = context.getConfiguration();
+ if (schema == null) {
+ schema = MRReasoningUtils.loadSchema(conf);
+ }
+ debug = MRReasoningUtils.debug(conf);
+ }
+ @Override
+ public void cleanup(Context context) throws IOException,
+ InterruptedException {
+ if (mout != null) {
+ mout.close();
+ }
+ log.info("Most input triples stored at one time by any reasoner: "
+ + maxStored + " (reasoner for node: " + maxNode + ")");
+ }
+ @Override
+ public void reduce(ResourceWritable key, Iterable<Fact> facts,
+ Context context) throws IOException, InterruptedException {
+ log.debug("Reasoning for node " + key.toString());
+ // If the schema was just updated, all facts are potentially
+ // meaningful again. Otherwise, any new derivation must use at
+ // least one fact from the previous (or this) iteration.
+ Configuration conf = context.getConfiguration();
+ LocalReasoner reasoner = new LocalReasoner(key.get(), schema,
+ MRReasoningUtils.getCurrentIteration(conf),
+ MRReasoningUtils.lastSchemaUpdate(conf));
+ long numInput = 0;
+ long numOutput = 0;
+ for (Fact fact : facts) {
+ if (debug) {
+ debugK.set("INPUT<" + key.get().stringValue() + ">");
+ debugV.set(fact.toString());
+ mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+ }
+ // We actually need separate fact objects, as the reasoner might
+ // store them (default is to reuse the same object each time)
+ reasoner.processFact(fact.clone());
+ numInput++;
+ numOutput += handleResults(reasoner, context);
+ if (numInput % LOG_INTERVAL == 0) {
+ log.debug(reasoner.getDiagnostics());
+ log.debug(numInput + " input triples so far");
+ log.debug(numOutput + " output triples/inconsistencies so far");
+ }
+ }
+ reasoner.getTypes();
+ numOutput += handleResults(reasoner, context);
+ int numStored = reasoner.getNumStored();
+ if (numStored > maxStored) {
+ maxStored = numStored;
+ maxNode = key.toString();
+ }
+ log.debug("..." + numStored + " input facts stored in memory");
+ }
+
+ /**
+ * Process any new results from a reasoner.
+ */
+ private long handleResults(LocalReasoner reasoner, Context context)
+ throws IOException, InterruptedException {
+ long numOutput = 0;
+ if (reasoner.hasNewFacts()) {
+ for (Fact fact : reasoner.getFacts()) {
+ mout.write(getOutputName(fact), fact, NullWritable.get());
+ numOutput++;
+ if (debug) {
+ debugK.set("OUTPUT<" + reasoner.getNode().stringValue() + ">");
+ debugV.set(fact.explain(false));
+ mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+ }
+ }
+ }
+ if (reasoner.hasInconsistencies()) {
+ for (Derivation inconsistency : reasoner.getInconsistencies()) {
+ mout.write(getOutputName(inconsistency), inconsistency,
+ NullWritable.get());
+ numOutput++;
+ if (debug) {
+ debugK.set("OUTPUT<" + inconsistency.getNode().stringValue() + ">");
+ debugV.set(inconsistency.explain(false));
+ mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+ }
+ }
+ }
+ return numOutput;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java
new file mode 100644
index 0000000..3bed4ca
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java
@@ -0,0 +1,336 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.rdftriplestore.RdfCloudTripleStore;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Convenience methods for MapReduce reasoning tasks and options.
+ */
+public class MRReasoningUtils {
+ // Configuration variables
+ public static final String WORKING_DIR = "reasoning.workingDir";
+ public static final String LOCAL_INPUT = "reasoning.inputLocal";
+ public static final String DEBUG_FLAG = "reasoning.debug";
+ public static final String OUTPUT_FLAG = "reasoning.output";
+ public static final String STATS_FLAG = "reasoning.stats";
+
+ // Variables used to pass information from drivers to jobs
+ public static final String STEP_PROP = "reasoning.step";
+ public static final String SCHEMA_UPDATE_PROP = "reasoning.schemaUpdate";
+
+ // Used to construct input/output directories
+ static final String OUTPUT_BASE = "step-";
+ static final String SCHEMA_BASE = "schema-";
+ static final String TEMP_SUFFIX = "a";
+ // Named outputs for different kinds of facts
+ static final String SCHEMA_OUT = "schema";
+ static final String INCONSISTENT_OUT = "inconsistencies";
+ static final String TERMINAL_OUT = "instance";
+ static final String INTERMEDIATE_OUT = "intermediate";
+ static final String DEBUG_OUT = "debug";
+
+ /**
+ * Load serialized schema information from a file.
+ */
+ public static Schema loadSchema(Configuration conf) {
+ SchemaWritable schema = new SchemaWritable();
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ Path schemaPath = getSchemaPath(conf);
+ if (fs.isDirectory(schemaPath)) {
+ for (FileStatus status : fs.listStatus(schemaPath)) {
+ schemaPath = status.getPath();
+ if (status.isFile() && status.getLen() > 0
+ && !schemaPath.getName().startsWith(DEBUG_OUT)) {
+ break;
+ }
+ }
+ }
+ SequenceFile.Reader in = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(schemaPath));
+ NullWritable key = NullWritable.get();
+ in.next(key, schema);
+ in.close();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ return schema;
+ }
+
+ /**
+ * Record that the schema was updated at this iteration.
+ */
+ static void schemaUpdated(Configuration conf) {
+ conf.setInt(SCHEMA_UPDATE_PROP, getCurrentIteration(conf));
+ }
+
+ /**
+ * Mark the beginning of the next iteration.
+ */
+ static void nextIteration(Configuration conf) {
+ conf.setInt(STEP_PROP, getCurrentIteration(conf)+1);
+ }
+
+ /**
+ * Convert an Accumulo row to a RyaStatement.
+ */
+ static RyaStatement getStatement(Key row, Value data, Configuration conf) {
+ try {
+ RyaTripleContext ryaContext = RyaTripleContext.getInstance(
+ new AccumuloRdfConfiguration(conf));
+ RyaStatement ryaStatement = ryaContext.deserializeTriple(
+ RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO,
+ new TripleRow(row.getRow().getBytes(), row.getColumnFamily().getBytes(),
+ row.getColumnQualifier().getBytes(), row.getTimestamp(),
+ row.getColumnVisibility().getBytes(), data.get()));
+ return ryaStatement;
+ }
+ catch (TripleRowResolverException e) {
+ e.printStackTrace();
+ System.err.println("row: " + row);
+ return null;
+ }
+ catch (IllegalArgumentException e) {
+ e.printStackTrace();
+ System.err.println("row: " + row);
+ throw e;
+ }
+ }
+
+ /**
+ * Clean up intermediate data, unless debug=true
+ */
+ static void clean(Configuration conf) throws IOException {
+ if (!debug(conf)) {
+ int iteration = getCurrentIteration(conf);
+ for (int i = 0; i <= iteration; i++) {
+ deleteIfExists(conf, OUTPUT_BASE + i);
+ deleteIfExists(conf, OUTPUT_BASE + i + TEMP_SUFFIX);
+ deleteIfExists(conf, SCHEMA_BASE + i);
+ }
+ deleteIfExists(conf, "input");
+ }
+ }
+
+ /**
+ * If a local input path was given, upload it to HDFS and configure file
+ * input. Useful for automating tests against small inputs.
+ */
+ static boolean uploadIfNecessary(Configuration conf)
+ throws IOException {
+ String local = conf.get(LOCAL_INPUT);
+ if (local == null) {
+ return false;
+ }
+ FileSystem fs = FileSystem.get(conf);
+ String current = new File("").getAbsolutePath();
+ Path sourcePath = new Path(current, local);
+ Path destPath = getOutputPath(conf, "input");
+ fs.copyFromLocalFile(false, true, sourcePath, destPath);
+ conf.set(MRUtils.INPUT_PATH, destPath.toString());
+ return true;
+ }
+
+ /**
+ * Delete an HDFS directory if it exists
+ */
+ static void deleteIfExists(Configuration conf, String rel)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path path = getOutputPath(conf, rel);
+ if (fs.isDirectory(path) || fs.isFile(path)) {
+ fs.delete(path, true);
+ }
+ }
+
+ /**
+ * Get a Repository from the configuration variables
+ */
+ static RyaSailRepository getRepository(Configuration conf)
+ throws AccumuloException, AccumuloSecurityException {
+ boolean mock = conf.getBoolean(MRUtils.AC_MOCK_PROP, false);
+ String instance = conf.get(MRUtils.AC_INSTANCE_PROP, "instance");
+ String username = conf.get(MRUtils.AC_USERNAME_PROP, "root");
+ String password = conf.get(MRUtils.AC_PWD_PROP, "root");
+ Instance accumulo;
+ if (mock) {
+ accumulo = new MockInstance(instance);
+ }
+ else {
+ String zookeepers = conf.get(MRUtils.AC_ZK_PROP, "zoo");
+ accumulo = new ZooKeeperInstance(instance, zookeepers);
+ }
+ Connector connector = accumulo.getConnector(username, new PasswordToken(password));
+ AccumuloRdfConfiguration aconf = new AccumuloRdfConfiguration(conf);
+ aconf.setTablePrefix(conf.get(MRUtils.TABLE_PREFIX_PROPERTY,
+ RdfCloudTripleStoreConstants.TBL_PRFX_DEF));
+ AccumuloRyaDAO dao = new AccumuloRyaDAO();
+ dao.setConnector(connector);
+ dao.setConf(aconf);
+ RdfCloudTripleStore store = new RdfCloudTripleStore();
+ store.setRyaDAO(dao);
+ return new RyaSailRepository(store);
+ }
+
+ /**
+ * Set up a MapReduce Job to use Accumulo as input.
+ */
+ static void configureAccumuloInput(Job job)
+ throws AccumuloSecurityException {
+ Configuration conf = job.getConfiguration();
+ String username = conf.get(MRUtils.AC_USERNAME_PROP, "root");
+ String password = conf.get(MRUtils.AC_PWD_PROP, "");
+ String instance = conf.get(MRUtils.AC_INSTANCE_PROP, "instance");
+ String zookeepers = conf.get(MRUtils.AC_ZK_PROP, "zoo");
+ Authorizations auths;
+ String auth = conf.get(MRUtils.AC_AUTH_PROP);
+ if (auth != null) {
+ auths = new Authorizations(auth.split(","));
+ }
+ else {
+ auths = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+ }
+ AccumuloInputFormat.setZooKeeperInstance(job,
+ ClientConfiguration.loadDefault()
+ .withInstance(instance).withZkHosts(zookeepers));
+ AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password));
+ AccumuloInputFormat.setInputTableName(job, getTableName(conf));
+ AccumuloInputFormat.setScanAuthorizations(job, auths);
+ }
+
+ /**
+ * Get the table name that will be used for Accumulo input.
+ */
+ static String getTableName(Configuration conf) {
+ String layout = conf.get(MRUtils.TABLE_LAYOUT_PROP,
+ RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO.toString());
+ String prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY,
+ RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
+ return RdfCloudTripleStoreUtils.layoutPrefixToTable(
+ RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(layout), prefix);
+ }
+
+ /**
+ * Whether we should output the final inferences.
+ */
+ static boolean shouldOutput(Configuration conf) {
+ return conf.getBoolean(OUTPUT_FLAG, true);
+ }
+
+ /**
+ * Return whether debug flag is on.
+ */
+ static boolean debug(Configuration conf) {
+ return conf.getBoolean(DEBUG_FLAG, false);
+ }
+
+ /**
+ * Return whether detailed statistics should be printed.
+ */
+ static boolean stats(Configuration conf) {
+ return conf.getBoolean(STATS_FLAG, false);
+ }
+
+ /**
+ * Get the Path for RDF file input, or null if not given.
+ */
+ static Path getInputPath(Configuration conf) {
+ String in = conf.get(MRUtils.INPUT_PATH);
+ if (in == null) {
+ return null;
+ }
+ return new Path(in);
+ }
+
+ /**
+ * Get the full output path for a configuration and relative pathname.
+ */
+ static Path getOutputPath(Configuration conf, String name) {
+ String root = conf.get(WORKING_DIR, "tmp/reasoning");
+ return new Path(root + "/" + name);
+ }
+
+ /**
+ * Get the path to the Schema.
+ */
+ static Path getSchemaPath(Configuration conf) {
+ int iteration = lastSchemaUpdate(conf);
+ return getOutputPath(conf, SCHEMA_BASE + iteration);
+ }
+
+ /**
+ * Get the current iteration, useful for keeping track of when facts were
+ * generated.
+ */
+ public static int getCurrentIteration(Configuration conf) {
+ return conf.getInt(STEP_PROP, 0);
+ }
+
+ /**
+ * Get the time of the last change to the schema.
+ */
+ static int lastSchemaUpdate(Configuration conf) {
+ return conf.getInt(SCHEMA_UPDATE_PROP, 0);
+ }
+
+ /**
+ * True if the schema was just updated on the last pass.
+ */
+ public static boolean isSchemaNew(Configuration conf) {
+ return lastSchemaUpdate(conf) == getCurrentIteration(conf) - 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java
new file mode 100644
index 0000000..527b887
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java
@@ -0,0 +1,110 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.reasoning.Derivation;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Collect inferred triples and detected inconsistencies as text.
+ */
+public class OutputTool extends AbstractReasoningTool {
+ @Override
+ protected void configureReasoningJob(String[] args) throws Exception {
+ MRReasoningUtils.deleteIfExists(job.getConfiguration(), "final");
+ configureFileInput(FactMapper.class, InconsistencyMapper.class, false);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setNumReduceTasks(1);
+ job.setReducerClass(OutputReducer.class);
+ configureTextOutput("final");
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new OutputTool(), args));
+ }
+
+ public static class FactMapper extends Mapper<Fact, NullWritable,
+ Text, Text> {
+ Text k = new Text();
+ Text v = new Text();
+ private boolean debug = false;
+ @Override
+ public void setup(Context context) {
+ debug = MRReasoningUtils.debug(context.getConfiguration());
+ }
+ @Override
+ public void map(Fact fact, NullWritable nw, Context context)
+ throws IOException, InterruptedException {
+ k.set(getOutputName(fact, true));
+ v.set(fact.toString());
+ context.write(k, v);
+ if (debug) {
+ k.set(MRReasoningUtils.DEBUG_OUT);
+ v.set(fact.explain(true));
+ context.write(k, v);
+ }
+ }
+ }
+
+ public static class InconsistencyMapper extends Mapper<Derivation,
+ NullWritable, Text, Text> {
+ Text k = new Text();
+ Text v = new Text();
+ Schema schema;
+ @Override
+ public void setup(Context context) {
+ schema = MRReasoningUtils.loadSchema(context.getConfiguration());
+ }
+ @Override
+ public void map(Derivation inconsistency, NullWritable nw, Context context)
+ throws IOException, InterruptedException {
+ k.set(getOutputName(inconsistency));
+ v.set("Inconsistency:\n" + inconsistency.explain(true, schema) + "\n");
+ context.write(k, v);
+ }
+ }
+
+ public static class OutputReducer extends Reducer<Text, Text, NullWritable, Text> {
+ private MultipleOutputs<NullWritable, Text> mout;
+ @Override
+ public void setup(Context context) {
+ mout = new MultipleOutputs<>(context);
+ }
+ @Override
+ public void reduce(Text key, Iterable<Text> values, Context context)
+ throws IOException, InterruptedException {
+ String out = key.toString();
+ for (Text value : values) {
+ mout.write(out, NullWritable.get(), value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java
new file mode 100644
index 0000000..cb2c2d0
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java
@@ -0,0 +1,154 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Runs a forward-chaining reasoner until no new facts can be derived.
+ */
+public class ReasoningDriver extends Configured implements Tool {
+ public static void main(String[] args) throws Exception {
+ int result = ToolRunner.run(new ReasoningDriver(), args);
+ System.exit(result);
+ }
+
+ private boolean reportStats = false;
+ long numInconsistencies = 0;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Configuration conf = getConf();
+ reportStats = MRReasoningUtils.stats(conf);
+ int iteration = 0;
+ long newInconsistencies;
+ long newInstance;
+ long newSchema;
+ long usefulOutput;
+ int result = 0;
+ boolean productive = true;
+ boolean findings = false;
+ SchemaFilter filter;
+ ForwardChain fc;
+ DuplicateElimination de;
+ RunStatistics runStats = new RunStatistics(MRReasoningUtils.getTableName(conf));
+
+ // If running against a local file, upload it
+ MRReasoningUtils.uploadIfNecessary(conf);
+
+ // Extract schema information from the database and save it to a file,
+ // unless the file already exists
+ Path schemaPath = MRReasoningUtils.getSchemaPath(conf);
+ if (!FileSystem.get(conf).isDirectory(schemaPath)) {
+ filter = new SchemaFilter();
+ result = ToolRunner.run(conf, filter, args);
+ if (result != 0) {
+ productive = false;
+ }
+ // Record basic information about the run
+ runStats.collect(filter, "SchemaFilter");
+ }
+
+ // Perform forward-chaining reasoning:
+ while (productive) {
+ MRReasoningUtils.nextIteration(conf);
+ // Attempt to derive new information
+ fc = new ForwardChain();
+ result = ToolRunner.run(conf, fc, args);
+ runStats.collect(fc, "ForwardChain");
+ if (result != 0) {
+ break;
+ }
+
+ // Only keep unique, newly generated facts
+ newInstance = fc.getNumInstanceTriples();
+ newSchema = fc.getNumSchemaTriples();
+ newInconsistencies = fc.getNumInconsistencies();
+ usefulOutput = fc.getNumUsefulOutput();
+ if (newInstance + newInconsistencies > 0) {
+ de = new DuplicateElimination();
+ result = ToolRunner.run(conf, de, args);
+ runStats.collect(de, "DuplicateElimination");
+ if (result != 0) {
+ break;
+ }
+ newInstance = de.getNumInstanceTriples();
+ newSchema = de.getNumSchemaTriples();
+ newInconsistencies = de.getNumInconsistencies();
+ usefulOutput = de.getNumUsefulOutput();
+ }
+
+ // If schema triples were just deduced, regenerate the whole schema
+ if (newSchema > 0) {
+ MRReasoningUtils.schemaUpdated(conf);
+ filter = new SchemaFilter();
+ result = ToolRunner.run(conf, filter, args);
+ runStats.collect(filter, "SchemaFilter");
+ if (result != 0) {
+ break;
+ }
+ }
+
+ iteration = MRReasoningUtils.getCurrentIteration(conf);
+ if (!reportStats) {
+ System.out.println("Iteration " + iteration + ":");
+ System.out.println("\t" + newInstance + " new instance triples (" +
+ usefulOutput + " useful for reasoning)");
+ System.out.println("\t" + newSchema + " new schema triples");
+ System.out.println("\t" + newInconsistencies + " new inconsistencies");
+ }
+ if (newInconsistencies + newInstance + newSchema > 0) {
+ findings = true;
+ }
+ numInconsistencies += newInconsistencies;
+ // Repeat if we're still generating information
+ productive = usefulOutput + newSchema > 0;
+ }
+
+ // Generate final output, if appropriate
+ if (result == 0 && findings && MRReasoningUtils.shouldOutput(conf)) {
+ OutputTool out = new OutputTool();
+ result = ToolRunner.run(conf, out, args);
+ runStats.collect(out, "OutputTool");
+ }
+
+ // Clean up intermediate data, if appropriate
+ MRReasoningUtils.clean(conf);
+
+ // Print stats, if specified
+ if (reportStats) {
+ System.out.println(runStats.report());
+ }
+
+ return result;
+ }
+
+ /**
+ * True if we've detected at least one inconsistency.
+ */
+ boolean hasInconsistencies() {
+ return numInconsistencies > 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java
new file mode 100644
index 0000000..986d8a2
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java
@@ -0,0 +1,145 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.openrdf.model.Resource;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+/**
+ * Allows us to use a URI or bnode for a key.
+ */
+public class ResourceWritable implements WritableComparable<ResourceWritable> {
+ private Resource val;
+ private int key = 0; // Allows for secondary sort
+
+ public Resource get() {
+ return val;
+ }
+
+ public void set(Resource val) {
+ this.val = val;
+ }
+
+ public void set(Resource val, int sortKey) {
+ this.val = val;
+ this.key = sortKey;
+ }
+
+ public void setSortKey(int key) {
+ this.key = key;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (val == null) {
+ out.writeUTF("");
+ }
+ else {
+ out.writeUTF(val.toString());
+ }
+ out.writeInt(key);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ String s = in.readUTF();
+ if (s.length() > 0) {
+ if (s.startsWith("_")) {
+ val = ValueFactoryImpl.getInstance().createBNode(s.substring(2));
+ }
+ else {
+ val = ValueFactoryImpl.getInstance().createURI(s);
+ }
+ }
+ key = in.readInt();
+ }
+
+ @Override
+ public int compareTo(ResourceWritable other) {
+ return val.stringValue().compareTo(other.val.stringValue());
+ }
+
+ @Override
+ public String toString() {
+ return "<" + val.stringValue() + ">";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o == null || this.getClass() != o.getClass()) {
+ return false;
+ }
+ ResourceWritable other = (ResourceWritable) o;
+ if (this.val == null) {
+ return other.val == null;
+ }
+ else if (other.val == null) {
+ return false;
+ }
+ else if (this.val.stringValue() == null) {
+ return other.val.stringValue() == null;
+ }
+ else {
+ return this.val.stringValue().equals(other.val.stringValue());
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return val != null ? val.stringValue().hashCode() : 0;
+ }
+
+ public static class PrimaryComparator extends WritableComparator {
+ PrimaryComparator() {
+ super(ResourceWritable.class, true);
+ }
+ @Override
+ public int compare(WritableComparable wc1, WritableComparable wc2) {
+ ResourceWritable node1 = (ResourceWritable) wc1;
+ ResourceWritable node2 = (ResourceWritable) wc2;
+ return node1.compareTo(node2);
+ }
+ }
+
+ public static class SecondaryComparator extends WritableComparator {
+ SecondaryComparator() {
+ super(ResourceWritable.class, true);
+ }
+ @Override
+ public int compare(WritableComparable wc1, WritableComparable wc2) {
+ ResourceWritable node1 = (ResourceWritable) wc1;
+ ResourceWritable node2 = (ResourceWritable) wc2;
+ int result = node1.compareTo(node2);
+ if (result == 0) {
+ result = node1.key - node2.key;
+ }
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java
new file mode 100644
index 0000000..38fbe60
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java
@@ -0,0 +1,253 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskCounter;
+
+/**
+ * Collect and report a variety of statistics for a run. Prints MapReduce
+ * metrics for each individual job, and overall totals, using Hadoop counters.
+ */
+public class RunStatistics {
+ static Map<Stat, TaskCounter> taskCounters = new HashMap<>();
+ static Map<Stat, JobCounter> jobCounters = new HashMap<>();
+
+ private enum Stat {
+ TABLE("tableName"),
+ RUN("run"),
+ ITERATION("iteration"),
+ JOB("job"),
+
+ ELAPSED_TIME("elapsed (ms)"),
+
+ TBOX_IN("tbox in"),
+ ABOX_IN("abox in"),
+ INCONSISTENCIES_OUT("inconsistencies out"),
+ TRIPLES_OUT("triples out"),
+
+ MAP_INPUT_RECORDS("map input records", TaskCounter.MAP_INPUT_RECORDS),
+ MAP_OUTPUT_RECORDS("map output records", TaskCounter.MAP_OUTPUT_RECORDS),
+ REDUCE_INPUT_GROUPS("reduce input groups", TaskCounter.REDUCE_INPUT_GROUPS),
+
+ MAPS("maps", JobCounter.TOTAL_LAUNCHED_MAPS),
+ REDUCES("reduces", JobCounter.TOTAL_LAUNCHED_REDUCES),
+ MAP_TIME("map time (ms)", JobCounter.MILLIS_MAPS),
+
+ REDUCE_TIME("reduce time (ms)", JobCounter.MILLIS_REDUCES),
+ MAP_TIME_VCORES("map time (ms) * cores", JobCounter.VCORES_MILLIS_MAPS),
+ REDUCE_TIME_VCORES("reduce time (ms) * cores", JobCounter.VCORES_MILLIS_REDUCES),
+
+ GC_TIME("gc time (ms)", TaskCounter.GC_TIME_MILLIS),
+ CPU_TIME("total cpu time (ms)", TaskCounter.CPU_MILLISECONDS),
+
+ MAP_TIME_MB("map time (ms) * memory (mb)", JobCounter.MB_MILLIS_MAPS),
+ REDUCE_TIME_MB("reduce time (ms) * memory (mb)", JobCounter.MB_MILLIS_REDUCES),
+ PHYSICAL_MEMORY_BYTES("physical memory (bytes)", TaskCounter.PHYSICAL_MEMORY_BYTES),
+ VIRTUAL_MEMORY_BYTES("virtual memory (bytes)", TaskCounter.VIRTUAL_MEMORY_BYTES),
+
+ DATA_LOCAL_MAPS("data-local maps", JobCounter.DATA_LOCAL_MAPS),
+ MAP_OUTPUT_BYTES("map output bytes", TaskCounter.MAP_OUTPUT_BYTES),
+
+ FILE_BYTES_READ("file bytes read"),
+ HDFS_BYTES_READ("hdfs bytes read"),
+ FILE_BYTES_WRITTEN("file bytes written"),
+ HDFS_BYTES_WRITTEN("hdfs bytes written"),
+
+ FRACTION_TIME_GC("proportion time in gc"),
+ FRACTION_MEMORY_USAGE("proportion allocated memory used"),
+ FRACTION_CPU_USAGE("proportion allocated cpu used");
+
+ String name;
+ Stat(String name) {
+ this.name = name;
+ }
+ Stat(String key, JobCounter jc) {
+ this.name = key;
+ jobCounters.put(this, jc);
+ }
+ Stat(String key, TaskCounter tc) {
+ this.name = key;
+ taskCounters.put(this, tc);
+ }
+ }
+
+ private class JobResult {
+ Map<Stat, String> info = new HashMap<>();
+ Map<Stat, Long> stats = new HashMap<>();
+
+ void add(JobResult other) {
+ for (Stat key : other.stats.keySet()) {
+ if (this.stats.containsKey(key)) {
+ stats.put(key, this.stats.get(key) + other.stats.get(key));
+ }
+ else {
+ stats.put(key, other.stats.get(key));
+ }
+ }
+ }
+
+ void computeMetrics() {
+ long t = stats.get(Stat.MAP_TIME) + stats.get(Stat.REDUCE_TIME);
+ long b = stats.get(Stat.PHYSICAL_MEMORY_BYTES);
+ long timeMbAllocated = stats.get(Stat.MAP_TIME_MB) + stats.get(Stat.REDUCE_TIME_MB);
+ long timeVcores = stats.get(Stat.MAP_TIME_VCORES) + stats.get(Stat.REDUCE_TIME_VCORES);
+ long gcTime = stats.get(Stat.GC_TIME);
+ long cpuTime = stats.get(Stat.CPU_TIME);
+ long tasks = stats.get(Stat.MAPS) + stats.get(Stat.REDUCES);
+ double mb = b / 1024.0 / 1024.0;
+ double avgMb = mb / tasks;
+ double timeMbUsed = t * avgMb;
+ info.put(Stat.FRACTION_TIME_GC, String.valueOf((double) gcTime / t));
+ info.put(Stat.FRACTION_MEMORY_USAGE, String.valueOf(timeMbUsed / timeMbAllocated));
+ info.put(Stat.FRACTION_CPU_USAGE, String.valueOf((double) cpuTime / timeVcores));
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < Stat.values().length; i++) {
+ Stat s = Stat.values()[i];
+ if (i > 0) {
+ sb.append(",");
+ }
+ if (info.containsKey(s)) {
+ sb.append(info.get(s));
+ }
+ else if (stats.containsKey(s)) {
+ sb.append(stats.get(s));
+ }
+ else {
+ sb.append("--");
+ }
+ }
+ return sb.toString();
+ }
+ }
+
+ List<JobResult> jobResults = new LinkedList<>();
+ Map<String, JobResult> jobTypeResults = new HashMap<>();
+ JobResult totals = new JobResult();
+
+ String runId;
+ String tableName;
+
+ /**
+ * Instantiate a RunStatistics object with respect to an overall run
+ * (which can consist of many jobs). Runs are identified by their first
+ * job.
+ * @param tableName Name of the input table
+ */
+ RunStatistics(String tableName) {
+ this.tableName = tableName;
+ totals.info.put(Stat.TABLE, tableName);
+ totals.info.put(Stat.ITERATION, "total");
+ totals.info.put(Stat.JOB, "all");
+ }
+
+ /**
+ * Collect all the statistics we're interested in for a single job.
+ * @param jobType Name of job type (ForwardChain, etc.)
+ */
+ void collect(AbstractReasoningTool tool, String name) throws IOException,
+ InterruptedException {
+ // ID is ID of the first job run
+ if (runId == null) {
+ runId = tool.getJobID().toString();
+ totals.info.put(Stat.RUN, runId);
+ }
+ JobResult jobValues = new JobResult();
+ jobValues.info.put(Stat.TABLE, tableName);
+ jobValues.info.put(Stat.RUN, runId);
+ jobValues.info.put(Stat.ITERATION, String.valueOf(tool.getIteration()));
+ jobValues.info.put(Stat.JOB, name);
+
+ jobValues.stats.put(Stat.ELAPSED_TIME, tool.getElapsedTime());
+ for (Stat key : taskCounters.keySet()) {
+ jobValues.stats.put(key, tool.getCounter(taskCounters.get(key)));
+ }
+ for (Stat key : jobCounters.keySet()) {
+ jobValues.stats.put(key, tool.getCounter(jobCounters.get(key)));
+ }
+ jobValues.stats.put(Stat.TBOX_IN, tool.getNumSchemaInput());
+ jobValues.stats.put(Stat.ABOX_IN, tool.getNumInstanceInput());
+ jobValues.stats.put(Stat.INCONSISTENCIES_OUT,
+ tool.getNumInconsistencies());
+ jobValues.stats.put(Stat.TRIPLES_OUT, tool.getNumSchemaTriples()
+ + tool.getNumInstanceTriples());
+ jobValues.stats.put(Stat.FILE_BYTES_READ, tool.getCounter(
+ FileSystemCounter.class.getName(), "FILE_BYTES_READ"));
+ jobValues.stats.put(Stat.FILE_BYTES_WRITTEN, tool.getCounter(
+ FileSystemCounter.class.getName(), "FILE_BYTES_WRITTEN"));
+ jobValues.stats.put(Stat.HDFS_BYTES_READ, tool.getCounter(
+ FileSystemCounter.class.getName(), "HDFS_BYTES_READ"));
+ jobValues.stats.put(Stat.HDFS_BYTES_WRITTEN, tool.getCounter(
+ FileSystemCounter.class.getName(), "HDFS_BYTES_WRITTEN"));
+ jobResults.add(jobValues);
+ // Add to the running total for this job type (initialize if needed)
+ if (!jobTypeResults.containsKey(name)) {
+ JobResult typeResult = new JobResult();
+ typeResult.info.put(Stat.TABLE, tableName);
+ typeResult.info.put(Stat.RUN, runId);
+ typeResult.info.put(Stat.ITERATION, "total");
+ typeResult.info.put(Stat.JOB, name);
+ jobTypeResults.put(name, typeResult);
+ }
+ jobTypeResults.get(name).add(jobValues);
+ totals.add(jobValues);
+ }
+
+ /**
+ * Report statistics for all jobs.
+ */
+ String report() {
+ StringBuilder sb = new StringBuilder();
+ // Header
+ for (int i = 0; i < Stat.values().length; i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(Stat.values()[i].name);
+ }
+ // One line per job
+ for (JobResult result : jobResults) {
+ result.computeMetrics();
+ sb.append("\n").append(result);
+ }
+ // Include aggregates for jobs and overall
+ if (jobTypeResults.containsKey("ForwardChain")) {
+ jobTypeResults.get("ForwardChain").computeMetrics();
+ sb.append("\n").append(jobTypeResults.get("ForwardChain"));
+ }
+ if (jobTypeResults.containsKey("DuplicateElimination")) {
+ jobTypeResults.get("DuplicateElimination").computeMetrics();
+ sb.append("\n").append(jobTypeResults.get("DuplicateElimination"));
+ }
+ totals.computeMetrics();
+ sb.append("\n").append(totals);
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java
new file mode 100644
index 0000000..3455e8e
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java
@@ -0,0 +1,165 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * Collects the schema information stored in the table and outputs the schema
+ * (TBox) to a file.
+ */
+public class SchemaFilter extends AbstractReasoningTool {
+ @Override
+ protected void configureReasoningJob(String[] args) throws Exception {
+ configureMultipleInput(SchemaTableMapper.class, SchemaRdfMapper.class,
+ SchemaFileMapper.class, true);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(Fact.class);
+ job.setReducerClass(SchemaFilterReducer.class);
+ job.setNumReduceTasks(1);
+ configureSchemaOutput();
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new SchemaFilter(), args));
+ }
+
+ public static class SchemaTableMapper extends Mapper<Key, Value,
+ NullWritable, Fact> {
+ private Fact fact = new Fact();
+ /**
+ * Output a triple if it is schema information.
+ */
+ @Override
+ public void map(Key row, Value data, Context context)
+ throws IOException, InterruptedException {
+ fact.setTriple(MRReasoningUtils.getStatement(row, data,
+ context.getConfiguration()));
+ boolean isSchemaTriple = Schema.isSchemaTriple(fact.getTriple());
+ if (isSchemaTriple) {
+ context.write(NullWritable.get(), fact);
+ }
+ countInput(isSchemaTriple, context);
+ }
+ }
+
+ public static class SchemaFileMapper extends Mapper<Fact,
+ NullWritable, NullWritable, Fact> {
+ /**
+ * For a given fact, output it if it's a schema triple.
+ */
+ @Override
+ public void map(Fact fact, NullWritable nw, Context context)
+ throws IOException, InterruptedException {
+ if (Schema.isSchemaTriple(fact.getTriple())) {
+ context.write(NullWritable.get(), fact);
+ }
+ }
+ }
+
+ public static class SchemaRdfMapper extends Mapper<LongWritable,
+ RyaStatementWritable, NullWritable, Fact> {
+ private Fact fact = new Fact();
+ /**
+ * For a given fact, output it if it's a schema triple.
+ */
+ @Override
+ public void map(LongWritable key, RyaStatementWritable rsw, Context context)
+ throws IOException, InterruptedException {
+ fact.setTriple(rsw.getRyaStatement());
+ boolean isSchemaTriple = Schema.isSchemaTriple(fact.getTriple());
+ if (isSchemaTriple) {
+ context.write(NullWritable.get(), fact);
+ }
+ countInput(isSchemaTriple, context);
+ }
+ }
+
+ public static class SchemaFilterReducer extends Reducer<NullWritable,
+ Fact, NullWritable, SchemaWritable> {
+ private SchemaWritable schema;
+ private Logger log = Logger.getLogger(SchemaFilterReducer.class);
+ private static int LOG_INTERVAL = 1000;
+ private boolean debug = false;
+ private MultipleOutputs<?, ?> debugOut;
+ private Text debugKey = new Text();
+ private Text debugValue = new Text();
+
+ @Override
+ protected void setup(Context context) {
+ schema = new SchemaWritable();
+ debug = MRReasoningUtils.debug(context.getConfiguration());
+ debugOut = new MultipleOutputs<>(context);
+ }
+
+ /**
+ * Collect all schema information into a Schema object, use it to derive
+ * as much additional schema information as we can, and serialize it to
+ * an HDFS file.
+ */
+ @Override
+ protected void reduce(NullWritable key, Iterable<Fact> triples,
+ Context context) throws IOException, InterruptedException {
+ long count = 0;
+ for (Fact fact : triples) {
+ schema.processTriple(fact.getTriple());
+ count++;
+ if (count % LOG_INTERVAL == 0) {
+ log.debug("After " + count + " schema triples...");
+ log.debug(schema.getSummary());
+ }
+ if (debug) {
+ debugKey.set("SCHEMA TRIPLE " + count);
+ debugValue.set(fact.explain(false));
+ debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey, debugValue);
+ }
+ }
+ log.debug("Total: " + count + " schema triples");
+ log.debug(schema.getSummary());
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ if (debugOut != null) {
+ debugOut.close();
+ }
+ // Perform schema-level reasoning
+ schema.closure();
+ // Output the complete schema
+ context.write(NullWritable.get(), schema);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java
new file mode 100644
index 0000000..c423a59
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java
@@ -0,0 +1,76 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+
+import mvm.rya.reasoning.OwlClass;
+import mvm.rya.reasoning.OwlProperty;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.hadoop.io.Writable;
+
+public class SchemaWritable extends Schema implements Writable {
+ @Override
+ public void write(DataOutput out) throws IOException {
+ ArrayList<OwlProperty> propList = new ArrayList<>(properties.values());
+ ArrayList<OwlClass> classList = new ArrayList<>(classes.values());
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ ObjectOutputStream stream = new ObjectOutputStream(bytes);
+ stream.writeObject(propList);
+ stream.writeObject(classList);
+ byte[] arr = bytes.toByteArray();
+ stream.close();
+ out.writeInt(arr.length);
+ out.write(arr);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ byte[] bytes = new byte[size];
+ in.readFully(bytes);
+ ObjectInputStream stream = new ObjectInputStream(
+ new ByteArrayInputStream(bytes));
+ try {
+ Iterable<?> propList = (Iterable<?>) stream.readObject();
+ Iterable<?> classList = (Iterable<?>) stream.readObject();
+ for (Object p : propList) {
+ OwlProperty prop = (OwlProperty) p;
+ properties.put(prop.getURI(), prop);
+ }
+ for (Object c : classList) {
+ OwlClass owlClass = (OwlClass) c;
+ classes.put(owlClass.getURI(), owlClass);
+ }
+ }
+ catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ stream.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java b/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java
new file mode 100644
index 0000000..2084e05
--- /dev/null
+++ b/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java
@@ -0,0 +1,512 @@
+package mvm.rya.reasoning;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.SKOS;
+
+public class LocalReasonerTest {
+ private LocalReasoner reasoner;
+ private Schema schema;
+
+ /**
+ * Load in a small schema to use in instance reasoning
+ */
+ @Before
+ public void loadSchema() {
+ schema = new Schema();
+ reasoner = new LocalReasoner(TestUtils.NODE, schema, 1, 0);
+ }
+
+ /**
+ * cax-sco
+ */
+ @Test
+ public void testInferSuperclass() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("Professor"),
+ RDFS.SUBCLASSOF, TestUtils.uri("Faculty")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ TestUtils.uri("Professor")));
+ Assert.assertTrue("Type not derived from subclass",
+ reasoner.types.knownTypes.containsKey(TestUtils.uri("Faculty")));
+ }
+
+ /**
+ * prp-dom
+ */
+ @Test
+ public void testInferDomain() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("hasAlumnus"),
+ RDFS.DOMAIN, TestUtils.uri("University")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("hasAlumnus"), TestUtils.uri("John Doe")));
+ Assert.assertTrue("Type not derived from rdfs:domain",
+ reasoner.types.knownTypes.containsKey(TestUtils.uri("University")));
+ }
+
+ /**
+ * prp-rng
+ */
+ @Test
+ public void testInferRange() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("advisor"),
+ RDFS.RANGE, TestUtils.uri("Professor")));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("John Doe"),
+ TestUtils.uri("advisor"), TestUtils.NODE));
+ Assert.assertTrue("Type not derived from rdfs:range",
+ reasoner.types.knownTypes.containsKey(TestUtils.uri("Professor")));
+ }
+
+ /**
+ * cls-nothing2
+ */
+ @Test
+ public void testNothingInconsistent() throws Exception {
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, OWL.NOTHING));
+ Assert.assertTrue("rdf:type owl:Nothing should be inconsistent",
+ reasoner.hasInconsistencies());
+ }
+
+ /**
+ * prp-inv1
+ */
+ @Test
+ public void testInverseProperty1() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("memberOf"),
+ OWL.INVERSEOF, TestUtils.uri("member")));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+ TestUtils.uri("memberOf"), TestUtils.NODE));
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.NODE)
+ && t.getPredicate().equals(TestUtils.uri("member"))
+ && t.getObject().equals(TestUtils.uri("y"))) {
+ return;
+ }
+ }
+ Assert.fail("Should have derived inverse triple");
+ }
+
+ /**
+ * prp-inv2
+ */
+ @Test
+ public void testInverseProperty2() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("memberOf"),
+ OWL.INVERSEOF, TestUtils.uri("member")));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+ TestUtils.uri("member"), TestUtils.NODE));
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.NODE)
+ && t.getPredicate().equals(TestUtils.uri("memberOf"))
+ && t.getObject().equals(TestUtils.uri("y"))) {
+ return;
+ }
+ }
+ Assert.fail("Should have derived inverse triple");
+ }
+
+ /**
+ * prp-spo1
+ */
+ @Test
+ public void testInferSuperproperty() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("headOf"),
+ RDFS.SUBPROPERTYOF, TestUtils.uri("worksFor")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("headOf"), TestUtils.uri("org")));
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.NODE)
+ && t.getPredicate().equals(TestUtils.uri("worksFor"))
+ && t.getObject().equals(TestUtils.uri("org"))) {
+ return;
+ }
+ }
+ Assert.fail("Superproperty not inferred from subproperty");
+ }
+
+ /**
+ * prp-symp
+ */
+ @Test
+ public void testSymmetry() throws Exception {
+ schema.processTriple(TestUtils.statement(SKOS.RELATED, RDF.TYPE,
+ OWL.SYMMETRICPROPERTY));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("y"), SKOS.RELATED,
+ TestUtils.NODE));
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.NODE)
+ && t.getPredicate().equals(SKOS.RELATED)
+ && t.getObject().equals(TestUtils.uri("y"))) {
+ return;
+ }
+ }
+ Assert.fail("Symmetric property not inferred");
+ }
+
+ /**
+ * cls-com
+ */
+ @Test
+ public void testComplementaryClasses() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("A"),
+ OWL.COMPLEMENTOF, TestUtils.uri("NotA")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ TestUtils.uri("A")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ TestUtils.uri("NotA")));
+ Assert.assertTrue("Complementary class membership not detected",
+ reasoner.hasInconsistencies());
+ }
+
+ /**
+ * cax-dw
+ */
+ @Test
+ public void testDisjointClasses() throws Exception {
+ schema.processTriple(TestUtils.statement(SKOS.CONCEPT, OWL.DISJOINTWITH,
+ SKOS.COLLECTION));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ SKOS.CONCEPT));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ SKOS.COLLECTION));
+ Assert.assertTrue("Disjoint class membership not detected",
+ reasoner.hasInconsistencies());
+ }
+
+ /**
+ * prp-trp
+ */
+ @Test
+ public void testTransitivePropertyIncomingOutgoing() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("subOrganizationOf"),
+ RDF.TYPE, OWL.TRANSITIVEPROPERTY));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+ TestUtils.uri("subOrganizationOf"), TestUtils.NODE));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("subOrganizationOf"), TestUtils.uri("z")));
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.uri("y"))
+ && t.getPredicate().equals(TestUtils.uri("subOrganizationOf"))
+ && t.getObject().equals(TestUtils.uri("z"))) {
+ return;
+ }
+ }
+ Assert.fail("Transitive relation not inferred (received incoming edge first)");
+ }
+ @Test
+ public void testTransitivePropertyOutgoingIncoming() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("subOrganizationOf"),
+ RDF.TYPE, OWL.TRANSITIVEPROPERTY));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("subOrganizationOf"), TestUtils.uri("z")));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+ TestUtils.uri("subOrganizationOf"), TestUtils.NODE));
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.uri("y"))
+ && t.getPredicate().equals(TestUtils.uri("subOrganizationOf"))
+ && t.getObject().equals(TestUtils.uri("z"))) {
+ Assert.fail("Transitive relation should not be inferred "
+ + "(received outgoing edge first)");
+ }
+ }
+ }
+
+ /**
+ * prp-irp
+ */
+ @Test
+ public void testIrreflexiveProperty() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("hasParent"),
+ RDF.TYPE, OWL2.IRREFLEXIVEPROPERTY));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("hasParent"), TestUtils.NODE));
+ Assert.assertTrue("Relation to self via irreflexive property not detected",
+ reasoner.hasInconsistencies());
+ }
+
+ /**
+ * prp-asyp
+ */
+ @Test
+ public void testAsymmetricPropertyIncomingOutgoing() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("hasChild"),
+ RDF.TYPE, OWL2.ASYMMETRICPROPERTY));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+ TestUtils.uri("hasChild"), TestUtils.NODE));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("hasChild"), TestUtils.uri("y")));
+ Assert.assertTrue("Symmetric relation with asymmetric property not detected"
+ + " (receiving incoming edge first)", reasoner.hasInconsistencies());
+ }
+ @Test
+ public void testAsymmetricPropertyReverseOutgoingIncoming() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("hasChild"), RDF.TYPE,
+ OWL2.ASYMMETRICPROPERTY));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("hasChild"), TestUtils.uri("y")));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+ TestUtils.uri("hasChild"), TestUtils.NODE));
+ Assert.assertFalse("Symmetric relation with asymmetric property should"
+ + " not be detected when receiving outgoing edge first",
+ reasoner.hasInconsistencies());
+ }
+ @Test
+ public void testAsymmetricPropertyReflexive() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("hasChild"),
+ RDF.TYPE, OWL2.ASYMMETRICPROPERTY));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("hasChild"), TestUtils.NODE));
+ Assert.assertTrue("Self-referential relation with asymmetric property"
+ + " not detected", reasoner.hasInconsistencies());
+ }
+
+ /**
+ * prp-pdw
+ */
+ @Test
+ public void testDisjointProperties() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("p1"),
+ OWL2.PROPERTYDISJOINTWITH, TestUtils.uri("p2")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("p1"), TestUtils.uri("y")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("p2"), TestUtils.uri("y")));
+ Assert.assertTrue("Disjoint property usage not detected (left-hand side"
+ + " of propertyDisjointWith received first)", reasoner.hasInconsistencies());
+ }
+ @Test
+ public void testDisjointPropertiesReverse() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("p2"),
+ OWL2.PROPERTYDISJOINTWITH, TestUtils.uri("p1")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("p1"), TestUtils.uri("y")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("p2"), TestUtils.uri("y")));
+ Assert.assertTrue("Disjoint property usage not detected (right-hand side"
+ + " of propertyDisjointWith received first)", reasoner.hasInconsistencies());
+ }
+
+ /**
+ * cls-svf1
+ */
+ @Test
+ public void testSomeValuesFromMembership() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.SOMEVALUESFROM, TestUtils.uri("Department")));
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.ONPROPERTY, TestUtils.uri("headOf")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ TestUtils.uri("Department")));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("John Doe"),
+ TestUtils.uri("headOf"), TestUtils.NODE));
+ reasoner.getTypes();
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.uri("John Doe"))
+ && t.getPredicate().equals(RDF.TYPE)
+ && t.getObject().equals(TestUtils.uri("x"))) {
+ return;
+ }
+ }
+ Assert.fail("If x is a property restriction [owl:someValuesFrom c]"
+ + " on property p; (z type c) then (y p z) should imply"
+ + " (y type x).");
+ }
+ @Test
+ public void testSomeValuesFromMembershipReverse() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.SOMEVALUESFROM, TestUtils.uri("Department")));
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.ONPROPERTY, TestUtils.uri("headOf")));
+ reasoner.processFact(TestUtils.fact(TestUtils.uri("John Doe"),
+ TestUtils.uri("headOf"), TestUtils.NODE));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ TestUtils.uri("Department")));
+ reasoner.getTypes();
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.uri("John Doe"))
+ && t.getPredicate().equals(RDF.TYPE)
+ && t.getObject().equals(TestUtils.uri("x"))) {
+ return;
+ }
+ }
+ Assert.fail("If x is a property restriction [owl:someValuesFrom c]"
+ + " on property p; (y p z) then (z type c) should imply"
+ + " (y type x).");
+ }
+
+ /**
+ * cls-svf2
+ */
+ @Test
+ public void testSomeValuesFromThing() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.SOMEVALUESFROM, OWL.THING));
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.ONPROPERTY, TestUtils.uri("foo")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("foo"), TestUtils.uri("bar")));
+ reasoner.getTypes();
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.NODE)
+ && t.getPredicate().equals(RDF.TYPE)
+ && t.getObject().equals(TestUtils.uri("x"))) {
+ return;
+ }
+ }
+ Assert.fail("If x is a property restriction [owl:someValuesFrom owl:Thing]"
+ + " on property p; (y p z) should imply (y type x) for any z.");
+ }
+
+ /**
+ * cls-hv1
+ */
+ @Test
+ public void testHasValueInferValue() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"), OWL.HASVALUE,
+ TestUtils.uri("y")));
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.ONPROPERTY, TestUtils.uri("p")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ TestUtils.uri("x")));
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.NODE)
+ && t.getPredicate().equals(TestUtils.uri("p"))
+ && t.getObject().equals(TestUtils.uri("y"))) {
+ return;
+ }
+ }
+ Assert.fail("If x is a property restriction [owl:hasValue y]"
+ + " on property p; (u type x) should imply (u p y) for any u.");
+ }
+
+ /**
+ * cls-hv2
+ */
+ @Test
+ public void testHasValueInferClass() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"), OWL.HASVALUE,
+ TestUtils.uri("y")));
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.ONPROPERTY, TestUtils.uri("p")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("p"), TestUtils.uri("y")));
+ reasoner.getTypes();
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.NODE)
+ && t.getPredicate().equals(RDF.TYPE)
+ && t.getObject().equals(TestUtils.uri("x"))) {
+ return;
+ }
+ }
+ Assert.fail("If x is a property restriction [owl:hasValue y]"
+ + " on property p; (u p y) should imply (u type x) for any u.");
+ }
+
+
+ /**
+ * cls-avf
+ */
+ @Test
+ public void testAllValuesFrom() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.ALLVALUESFROM, TestUtils.uri("Human")));
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.ONPROPERTY, TestUtils.uri("hasParent")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("hasParent"), TestUtils.uri("v")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ TestUtils.uri("x")));
+ reasoner.getTypes();
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.uri("v"))
+ && t.getPredicate().equals(RDF.TYPE)
+ && t.getObject().equals(TestUtils.uri("Human"))) {
+ return;
+ }
+ }
+ Assert.fail("If x is a property restriction [owl:allValuesFrom c]"
+ + " on property p; (u p v) then (u type x) should imply (v type c).");
+ }
+ @Test
+ public void testAllValuesFromReverse() throws Exception {
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.ALLVALUESFROM, TestUtils.uri("Human")));
+ schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+ OWL.ONPROPERTY, TestUtils.uri("hasParent")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+ TestUtils.uri("x")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+ TestUtils.uri("hasParent"), TestUtils.uri("v")));
+ reasoner.getTypes();
+ for (Fact t : reasoner.getFacts()) {
+ if (t.getSubject().equals(TestUtils.uri("v"))
+ && t.getPredicate().equals(RDF.TYPE)
+ && t.getObject().equals(TestUtils.uri("Human"))) {
+ return;
+ }
+ }
+ Assert.fail("If x is a property restriction [owl:allValuesFrom c]"
+ + " on property p; (u type x) then (u p v) should imply (v type c).");
+ }
+
+ /**
+ * cls-maxc1
+ */
+ @Test
+ public void testMaxCardinalityZero() throws Exception {
+ URI r = TestUtils.uri("restriction");
+ URI p = TestUtils.uri("impossiblePredicate1");
+ schema.processTriple(TestUtils.statement(r, OWL.MAXCARDINALITY,
+ TestUtils.intLiteral("0")));
+ schema.processTriple(TestUtils.statement(r, OWL.ONPROPERTY, p));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, p,
+ TestUtils.uri("y")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, r));
+ Assert.assertTrue("If p has maxCardinality of 0; then any (x p y)"
+ + " should be found inconsistent", reasoner.hasInconsistencies());
+ }
+
+ /**
+ * cls-maxqc2
+ */
+ @Test
+ public void testMaxQCardinalityZeroThings() throws Exception {
+ Resource r = TestUtils.bnode("restriction");
+ URI p = TestUtils.uri("impossiblePredicate2");
+ schema.processTriple(TestUtils.statement(r, OWL2.MAXQUALIFIEDCARDINALITY,
+ TestUtils.intLiteral("0")));
+ schema.processTriple(TestUtils.statement(r, OWL.ONPROPERTY, p));
+ schema.processTriple(TestUtils.statement(r, OWL2.ONCLASS, OWL.THING));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, p,
+ TestUtils.uri("y")));
+ reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, r));
+ Assert.assertTrue("If p has maxQualifiedCardinality of 0 on owl:Thing;"
+ + " then any (x p y) should be found inconsistent",
+ reasoner.hasInconsistencies());
+ }
+}