You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/06/21 17:11:41 UTC

[2/5] incubator-rya git commit: Included project rya.reasoning.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java
new file mode 100644
index 0000000..714ad1c
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java
@@ -0,0 +1,278 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.reasoning.Derivation;
+import mvm.rya.reasoning.LocalReasoner;
+import mvm.rya.reasoning.LocalReasoner.Relevance;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.openrdf.model.Resource;
+
+public class ForwardChain extends AbstractReasoningTool {
+    @Override
+    protected void configureReasoningJob(String[] args) throws Exception {
+        distributeSchema();
+        Configuration conf = job.getConfiguration();
+        // We can ignore irrelevant triples, unless the schema has just changed
+        // and therefore we can't rely on previous determinations of relevance.
+        configureMultipleInput(TableMapper.class, RdfMapper.class,
+            FileMapper.class, !MRReasoningUtils.isSchemaNew(conf));
+        job.setMapOutputKeyClass(ResourceWritable.class);
+        job.setMapOutputValueClass(Fact.class);
+        job.setReducerClass(ReasoningReducer.class);
+        job.setSortComparatorClass(ResourceWritable.SecondaryComparator.class);
+        job.setGroupingComparatorClass(ResourceWritable.PrimaryComparator.class);
+        configureDerivationOutput(true);
+    }
+
+    public static void main(String[] args) throws Exception {
+        System.exit(ToolRunner.run(new ForwardChain(), args));
+    }
+
+    /**
+     * Decide whether to output facts and with what keys. Subclasses handle
+     * different sources of input.
+     */
+    public static class ForwardChainMapper<K, V> extends Mapper<K, V,
+            ResourceWritable, Fact> {
+        protected Schema schema;
+        protected ResourceWritable node = new ResourceWritable();
+        protected MultipleOutputs<?, ?> debugOut;
+        protected boolean debug;
+        private Text debugKey = new Text();
+        private Text debugValue = new Text();
+        public ForwardChainMapper(Schema s) {
+            this.schema = s;
+        }
+        public ForwardChainMapper() {}
+
+        @Override
+        protected void setup(Context context) {
+            debugOut = new MultipleOutputs<>(context);
+            Configuration conf = context.getConfiguration();
+            if (schema == null) {
+                schema = MRReasoningUtils.loadSchema(context.getConfiguration());
+            }
+            debug = MRReasoningUtils.debug(conf);
+        }
+        @Override
+        public void cleanup(Context context) throws IOException,
+                InterruptedException {
+            if (debugOut != null) {
+                debugOut.close();
+            }
+        }
+
+        protected void process(Context context, Fact inputTriple)
+                throws IOException, InterruptedException {
+            Relevance rel = LocalReasoner.relevantFact(inputTriple, schema);
+            if (rel.subject()) {
+                node.set(inputTriple.getSubject(), 1);
+                context.write(node, inputTriple);
+                if (debug) {
+                    int i = inputTriple.getIteration();
+                    debugKey.set("MAP_OUT" + node.toString());
+                    debugValue.set(inputTriple.explain(false) + "[" + i + "]");
+                    debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey,
+                        debugValue);
+                }
+            }
+            if (rel.object()) {
+                node.set((Resource) inputTriple.getObject(), -1);
+                context.write(node, inputTriple);
+                if (debug) {
+                    int i = inputTriple.getIteration();
+                    debugKey.set("MAP_OUT" + node.toString());
+                    debugValue.set(inputTriple.explain(false) + "[" + i + "]");
+                    debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey,
+                        debugValue);
+                }
+            }
+        }
+    }
+
+    /**
+     * Get input data from the database
+     */
+    public static class TableMapper extends ForwardChainMapper<Key, Value> {
+        private Fact inputTriple = new Fact();
+        public TableMapper() { super(); }
+        public TableMapper(Schema s) { super(s); }
+        @Override
+        public void map(Key row, Value data, Context context)
+                throws IOException, InterruptedException {
+            inputTriple.setTriple(MRReasoningUtils.getStatement(row, data,
+                context.getConfiguration()));
+            process(context, inputTriple);
+        }
+    }
+
+    /**
+     * Get intermediate data from a sequence file
+     */
+    public static class FileMapper extends ForwardChainMapper<Fact,
+            NullWritable> {
+        public FileMapper() { super(); }
+        public FileMapper(Schema s) { super(s); }
+        @Override
+        public void map(Fact inputTriple, NullWritable nw,
+                Context context) throws IOException, InterruptedException {
+            process(context, inputTriple);
+        }
+    }
+
+    /**
+     * Get input data from an RDF file
+     */
+    public static class RdfMapper extends ForwardChainMapper<LongWritable,
+            RyaStatementWritable> {
+        private Fact inputTriple = new Fact();
+        public RdfMapper() { super(); }
+        public RdfMapper(Schema s) { super(s); }
+        @Override
+        public void map(LongWritable key, RyaStatementWritable rsw,
+                Context context) throws IOException, InterruptedException {
+            inputTriple.setTriple(rsw.getRyaStatement());
+            process(context, inputTriple);
+        }
+    }
+
+    public static class ReasoningReducer extends Reducer<ResourceWritable,
+            Fact, Fact, NullWritable> {
+        private static final int LOG_INTERVAL = 5000;
+        private Logger log = Logger.getLogger(ReasoningReducer.class);
+        private MultipleOutputs<?, ?> mout;
+        private Schema schema;
+        private boolean debug;
+        private Text debugK = new Text();
+        private Text debugV = new Text();
+        private int maxStored = 0;
+        private String maxNode = "";
+        public ReasoningReducer(Schema s) {
+            this.schema = s;
+        }
+        public ReasoningReducer() {}
+        @Override
+        public void setup(Context context) {
+            mout = new MultipleOutputs<>(context);
+            Configuration conf = context.getConfiguration();
+            if (schema == null) {
+                schema = MRReasoningUtils.loadSchema(conf);
+            }
+            debug = MRReasoningUtils.debug(conf);
+        }
+        @Override
+        public void cleanup(Context context) throws IOException,
+                InterruptedException {
+            if (mout != null) {
+                mout.close();
+            }
+            log.info("Most input triples stored at one time by any reasoner: "
+                + maxStored + " (reasoner for node: " + maxNode + ")");
+        }
+        @Override
+        public void reduce(ResourceWritable key, Iterable<Fact> facts,
+                Context context) throws IOException, InterruptedException {
+            log.debug("Reasoning for node " + key.toString());
+            // If the schema was just updated, all facts are potentially
+            // meaningful again. Otherwise, any new derivation must use at
+            // least one fact from the previous (or this) iteration.
+            Configuration conf = context.getConfiguration();
+            LocalReasoner reasoner = new LocalReasoner(key.get(), schema,
+                MRReasoningUtils.getCurrentIteration(conf),
+                MRReasoningUtils.lastSchemaUpdate(conf));
+            long numInput = 0;
+            long numOutput = 0;
+            for (Fact fact : facts) {
+                if (debug) {
+                    debugK.set("INPUT<" + key.get().stringValue() + ">");
+                    debugV.set(fact.toString());
+                    mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+                }
+                // We actually need separate fact objects, as the reasoner might
+                // store them (default is to reuse the same object each time)
+                reasoner.processFact(fact.clone());
+                numInput++;
+                numOutput += handleResults(reasoner, context);
+                if (numInput % LOG_INTERVAL == 0) {
+                    log.debug(reasoner.getDiagnostics());
+                    log.debug(numInput + " input triples so far");
+                    log.debug(numOutput + " output triples/inconsistencies so far");
+                }
+            }
+            reasoner.getTypes();
+            numOutput += handleResults(reasoner, context);
+            int numStored = reasoner.getNumStored();
+            if (numStored > maxStored) {
+                maxStored = numStored;
+                maxNode = key.toString();
+            }
+            log.debug("..." + numStored + " input facts stored in memory");
+        }
+
+        /**
+         * Process any new results from a reasoner.
+         */
+        private long handleResults(LocalReasoner reasoner, Context context)
+                throws IOException, InterruptedException {
+            long numOutput = 0;
+            if (reasoner.hasNewFacts()) {
+                for (Fact fact : reasoner.getFacts()) {
+                    mout.write(getOutputName(fact), fact, NullWritable.get());
+                    numOutput++;
+                    if (debug) {
+                        debugK.set("OUTPUT<" + reasoner.getNode().stringValue() + ">");
+                        debugV.set(fact.explain(false));
+                        mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+                    }
+                }
+            }
+            if (reasoner.hasInconsistencies()) {
+                for (Derivation inconsistency : reasoner.getInconsistencies()) {
+                    mout.write(getOutputName(inconsistency), inconsistency,
+                        NullWritable.get());
+                    numOutput++;
+                    if (debug) {
+                        debugK.set("OUTPUT<" + inconsistency.getNode().stringValue() + ">");
+                        debugV.set(inconsistency.explain(false));
+                        mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+                    }
+                }
+            }
+            return numOutput;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java
new file mode 100644
index 0000000..3bed4ca
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java
@@ -0,0 +1,336 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.rdftriplestore.RdfCloudTripleStore;
+import mvm.rya.rdftriplestore.RyaSailRepository;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Convenience methods for MapReduce reasoning tasks and options.
+ */
+public class MRReasoningUtils {
+    // Configuration variables
+    public static final String WORKING_DIR = "reasoning.workingDir";
+    public static final String LOCAL_INPUT = "reasoning.inputLocal";
+    public static final String DEBUG_FLAG = "reasoning.debug";
+    public static final String OUTPUT_FLAG = "reasoning.output";
+    public static final String STATS_FLAG = "reasoning.stats";
+
+    // Variables used to pass information from drivers to jobs
+    public static final String STEP_PROP = "reasoning.step";
+    public static final String SCHEMA_UPDATE_PROP = "reasoning.schemaUpdate";
+
+    // Used to construct input/output directories
+    static final String OUTPUT_BASE = "step-";
+    static final String SCHEMA_BASE = "schema-";
+    static final String TEMP_SUFFIX = "a";
+    // Named outputs for different kinds of facts
+    static final String SCHEMA_OUT = "schema";
+    static final String INCONSISTENT_OUT = "inconsistencies";
+    static final String TERMINAL_OUT = "instance";
+    static final String INTERMEDIATE_OUT = "intermediate";
+    static final String DEBUG_OUT = "debug";
+
+    /**
+     * Load serialized schema information from a file.
+     */
+    public static Schema loadSchema(Configuration conf) {
+        SchemaWritable schema = new SchemaWritable();
+        try {
+            FileSystem fs = FileSystem.get(conf);
+            Path schemaPath = getSchemaPath(conf);
+            if (fs.isDirectory(schemaPath)) {
+                for (FileStatus status : fs.listStatus(schemaPath)) {
+                    schemaPath = status.getPath();
+                    if (status.isFile() && status.getLen() > 0
+                        && !schemaPath.getName().startsWith(DEBUG_OUT)) {
+                        break;
+                    }
+                }
+            }
+            SequenceFile.Reader in = new SequenceFile.Reader(conf,
+                SequenceFile.Reader.file(schemaPath));
+            NullWritable key = NullWritable.get();
+            in.next(key, schema);
+            in.close();
+        }
+        catch (IOException e) {
+            e.printStackTrace();
+        }
+        return schema;
+    }
+
+    /**
+     * Record that the schema was updated at this iteration.
+     */
+    static void schemaUpdated(Configuration conf) {
+        conf.setInt(SCHEMA_UPDATE_PROP, getCurrentIteration(conf));
+    }
+
+    /**
+     * Mark the beginning of the next iteration.
+     */
+    static void nextIteration(Configuration conf) {
+        conf.setInt(STEP_PROP, getCurrentIteration(conf)+1);
+    }
+
+    /**
+     * Convert an Accumulo row to a RyaStatement.
+     */
+    static RyaStatement getStatement(Key row, Value data, Configuration conf) {
+        try {
+            RyaTripleContext ryaContext = RyaTripleContext.getInstance(
+                new AccumuloRdfConfiguration(conf));
+            RyaStatement ryaStatement = ryaContext.deserializeTriple(
+                RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO,
+                new TripleRow(row.getRow().getBytes(), row.getColumnFamily().getBytes(),
+                    row.getColumnQualifier().getBytes(), row.getTimestamp(),
+                    row.getColumnVisibility().getBytes(), data.get()));
+            return ryaStatement;
+        }
+        catch (TripleRowResolverException e) {
+            e.printStackTrace();
+            System.err.println("row: " + row);
+            return null;
+        }
+        catch (IllegalArgumentException e) {
+            e.printStackTrace();
+            System.err.println("row: " + row);
+            throw e;
+        }
+    }
+
+    /**
+     * Clean up intermediate data, unless debug=true
+     */
+    static void clean(Configuration conf) throws IOException {
+        if (!debug(conf)) {
+            int iteration = getCurrentIteration(conf);
+            for (int i = 0; i <= iteration; i++) {
+                deleteIfExists(conf, OUTPUT_BASE + i);
+                deleteIfExists(conf, OUTPUT_BASE + i + TEMP_SUFFIX);
+                deleteIfExists(conf, SCHEMA_BASE + i);
+            }
+            deleteIfExists(conf, "input");
+        }
+    }
+
+    /**
+     * If a local input path was given, upload it to HDFS and configure file
+     * input. Useful for automating tests against small inputs.
+     */
+    static boolean uploadIfNecessary(Configuration conf)
+            throws IOException {
+        String local = conf.get(LOCAL_INPUT);
+        if (local == null) {
+            return false;
+        }
+        FileSystem fs = FileSystem.get(conf);
+        String current = new File("").getAbsolutePath();
+        Path sourcePath = new Path(current, local);
+        Path destPath = getOutputPath(conf, "input");
+        fs.copyFromLocalFile(false, true, sourcePath, destPath);
+        conf.set(MRUtils.INPUT_PATH, destPath.toString());
+        return true;
+    }
+
+    /**
+     * Delete an HDFS directory if it exists
+     */
+    static void deleteIfExists(Configuration conf, String rel)
+            throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        Path path = getOutputPath(conf, rel);
+        if (fs.isDirectory(path) || fs.isFile(path)) {
+            fs.delete(path, true);
+        }
+    }
+
+    /**
+     * Get a Repository from the configuration variables
+     */
+    static RyaSailRepository getRepository(Configuration conf)
+            throws AccumuloException, AccumuloSecurityException {
+        boolean mock = conf.getBoolean(MRUtils.AC_MOCK_PROP, false);
+        String instance = conf.get(MRUtils.AC_INSTANCE_PROP, "instance");
+        String username = conf.get(MRUtils.AC_USERNAME_PROP, "root");
+        String password = conf.get(MRUtils.AC_PWD_PROP, "root");
+        Instance accumulo;
+        if (mock) {
+            accumulo = new MockInstance(instance);
+        }
+        else {
+            String zookeepers = conf.get(MRUtils.AC_ZK_PROP, "zoo");
+            accumulo = new ZooKeeperInstance(instance, zookeepers);
+        }
+        Connector connector = accumulo.getConnector(username, new PasswordToken(password));
+        AccumuloRdfConfiguration aconf = new AccumuloRdfConfiguration(conf);
+        aconf.setTablePrefix(conf.get(MRUtils.TABLE_PREFIX_PROPERTY,
+            RdfCloudTripleStoreConstants.TBL_PRFX_DEF));
+        AccumuloRyaDAO dao = new AccumuloRyaDAO();
+        dao.setConnector(connector);
+        dao.setConf(aconf);
+        RdfCloudTripleStore store = new RdfCloudTripleStore();
+        store.setRyaDAO(dao);
+        return new RyaSailRepository(store);
+    }
+
+    /**
+     * Set up a MapReduce Job to use Accumulo as input.
+     */
+    static void configureAccumuloInput(Job job)
+            throws AccumuloSecurityException {
+        Configuration conf = job.getConfiguration();
+        String username = conf.get(MRUtils.AC_USERNAME_PROP, "root");
+        String password = conf.get(MRUtils.AC_PWD_PROP, "");
+        String instance = conf.get(MRUtils.AC_INSTANCE_PROP, "instance");
+        String zookeepers = conf.get(MRUtils.AC_ZK_PROP, "zoo");
+        Authorizations auths;
+        String auth = conf.get(MRUtils.AC_AUTH_PROP);
+        if (auth != null) {
+            auths = new Authorizations(auth.split(","));
+        }
+        else {
+            auths = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+        }
+        AccumuloInputFormat.setZooKeeperInstance(job,
+            ClientConfiguration.loadDefault()
+            .withInstance(instance).withZkHosts(zookeepers));
+        AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password));
+        AccumuloInputFormat.setInputTableName(job, getTableName(conf));
+        AccumuloInputFormat.setScanAuthorizations(job, auths);
+    }
+
+    /**
+     * Get the table name that will be used for Accumulo input.
+     */
+    static String getTableName(Configuration conf) {
+        String layout = conf.get(MRUtils.TABLE_LAYOUT_PROP,
+            RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO.toString());
+        String prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY,
+            RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
+        return RdfCloudTripleStoreUtils.layoutPrefixToTable(
+            RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(layout), prefix);
+    }
+
+    /**
+     * Whether we should output the final inferences.
+     */
+    static boolean shouldOutput(Configuration conf) {
+        return conf.getBoolean(OUTPUT_FLAG, true);
+    }
+
+    /**
+     * Return whether debug flag is on.
+     */
+    static boolean debug(Configuration conf) {
+        return conf.getBoolean(DEBUG_FLAG, false);
+    }
+
+    /**
+     * Return whether detailed statistics should be printed.
+     */
+    static boolean stats(Configuration conf) {
+        return conf.getBoolean(STATS_FLAG, false);
+    }
+
+    /**
+     * Get the Path for RDF file input, or null if not given.
+     */
+    static Path getInputPath(Configuration conf) {
+        String in = conf.get(MRUtils.INPUT_PATH);
+        if (in == null) {
+            return null;
+        }
+        return new Path(in);
+    }
+
+    /**
+     * Get the full output path for a configuration and relative pathname.
+     */
+    static Path getOutputPath(Configuration conf, String name) {
+        String root = conf.get(WORKING_DIR, "tmp/reasoning");
+        return new Path(root + "/" + name);
+    }
+
+    /**
+     * Get the path to the Schema.
+     */
+    static Path getSchemaPath(Configuration conf) {
+        int iteration = lastSchemaUpdate(conf);
+        return getOutputPath(conf, SCHEMA_BASE + iteration);
+    }
+
+    /**
+     * Get the current iteration, useful for keeping track of when facts were
+     * generated.
+     */
+    public static int getCurrentIteration(Configuration conf) {
+        return conf.getInt(STEP_PROP, 0);
+    }
+
+    /**
+     * Get the time of the last change to the schema.
+     */
+    static int lastSchemaUpdate(Configuration conf) {
+        return conf.getInt(SCHEMA_UPDATE_PROP, 0);
+    }
+
+    /**
+     * True if the schema was just updated on the last pass.
+     */
+    public static boolean isSchemaNew(Configuration conf) {
+        return lastSchemaUpdate(conf) == getCurrentIteration(conf) - 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java
new file mode 100644
index 0000000..527b887
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java
@@ -0,0 +1,110 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.reasoning.Derivation;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Collect inferred triples and detected inconsistencies as text.
+ */
+public class OutputTool extends AbstractReasoningTool {
+    @Override
+    protected void configureReasoningJob(String[] args) throws Exception {
+        MRReasoningUtils.deleteIfExists(job.getConfiguration(), "final");
+        configureFileInput(FactMapper.class, InconsistencyMapper.class, false);
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Text.class);
+        job.setNumReduceTasks(1);
+        job.setReducerClass(OutputReducer.class);
+        configureTextOutput("final");
+    }
+
+    public static void main(String[] args) throws Exception {
+        System.exit(ToolRunner.run(new OutputTool(), args));
+    }
+
+    public static class FactMapper extends Mapper<Fact, NullWritable,
+            Text, Text> {
+        Text k = new Text();
+        Text v = new Text();
+        private boolean debug = false;
+        @Override
+        public void setup(Context context) {
+            debug = MRReasoningUtils.debug(context.getConfiguration());
+        }
+        @Override
+        public void map(Fact fact, NullWritable nw, Context context)
+                throws IOException, InterruptedException {
+            k.set(getOutputName(fact, true));
+            v.set(fact.toString());
+            context.write(k, v);
+            if (debug) {
+                k.set(MRReasoningUtils.DEBUG_OUT);
+                v.set(fact.explain(true));
+                context.write(k, v);
+            }
+        }
+    }
+
+    public static class InconsistencyMapper extends Mapper<Derivation,
+            NullWritable, Text, Text> {
+        Text k = new Text();
+        Text v = new Text();
+        Schema schema;
+        @Override
+        public void setup(Context context) {
+            schema = MRReasoningUtils.loadSchema(context.getConfiguration());
+        }
+        @Override
+        public void map(Derivation inconsistency, NullWritable nw, Context context)
+                throws IOException, InterruptedException {
+            k.set(getOutputName(inconsistency));
+            v.set("Inconsistency:\n" + inconsistency.explain(true, schema) + "\n");
+            context.write(k, v);
+        }
+    }
+
+    public static class OutputReducer extends Reducer<Text, Text, NullWritable, Text> {
+        private MultipleOutputs<NullWritable, Text> mout;
+        @Override
+        public void setup(Context context) {
+            mout = new MultipleOutputs<>(context);
+        }
+        @Override
+        public void reduce(Text key, Iterable<Text> values, Context context)
+                throws IOException, InterruptedException {
+            String out = key.toString();
+            for (Text value : values) {
+                mout.write(out, NullWritable.get(), value);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java
new file mode 100644
index 0000000..cb2c2d0
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java
@@ -0,0 +1,154 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Runs a forward-chaining reasoner until no new facts can be derived.
+ */
+public class ReasoningDriver extends Configured implements Tool {
+    public static void main(String[] args) throws Exception {
+        int result = ToolRunner.run(new ReasoningDriver(), args);
+        System.exit(result);
+    }
+
+    private boolean reportStats = false;
+    long numInconsistencies = 0;
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Configuration conf = getConf();
+        reportStats = MRReasoningUtils.stats(conf);
+        int iteration = 0;
+        long newInconsistencies;
+        long newInstance;
+        long newSchema;
+        long usefulOutput;
+        int result = 0;
+        boolean productive = true;
+        boolean findings = false;
+        SchemaFilter filter;
+        ForwardChain fc;
+        DuplicateElimination de;
+        RunStatistics runStats = new RunStatistics(MRReasoningUtils.getTableName(conf));
+
+        // If running against a local file, upload it
+        MRReasoningUtils.uploadIfNecessary(conf);
+
+        // Extract schema information from the database and save it to a file,
+        // unless the file already exists
+        Path schemaPath = MRReasoningUtils.getSchemaPath(conf);
+        if (!FileSystem.get(conf).isDirectory(schemaPath)) {
+            filter = new SchemaFilter();
+            result = ToolRunner.run(conf, filter, args);
+            if (result != 0) {
+                productive = false;
+            }
+            // Record basic information about the run
+            runStats.collect(filter, "SchemaFilter");
+        }
+
+        // Perform forward-chaining reasoning:
+        while (productive) {
+            MRReasoningUtils.nextIteration(conf);
+            // Attempt to derive new information
+            fc = new ForwardChain();
+            result = ToolRunner.run(conf, fc, args);
+            runStats.collect(fc, "ForwardChain");
+            if (result != 0) {
+                break;
+            }
+
+            // Only keep unique, newly generated facts
+            newInstance = fc.getNumInstanceTriples();
+            newSchema = fc.getNumSchemaTriples();
+            newInconsistencies = fc.getNumInconsistencies();
+            usefulOutput = fc.getNumUsefulOutput();
+            if (newInstance + newInconsistencies > 0) {
+                de = new DuplicateElimination();
+                result = ToolRunner.run(conf, de, args);
+                runStats.collect(de, "DuplicateElimination");
+                if (result != 0) {
+                    break;
+                }
+                newInstance = de.getNumInstanceTriples();
+                newSchema = de.getNumSchemaTriples();
+                newInconsistencies = de.getNumInconsistencies();
+                usefulOutput = de.getNumUsefulOutput();
+            }
+
+            // If schema triples were just deduced, regenerate the whole schema
+            if (newSchema > 0) {
+                MRReasoningUtils.schemaUpdated(conf);
+                filter = new SchemaFilter();
+                result = ToolRunner.run(conf, filter, args);
+                runStats.collect(filter, "SchemaFilter");
+                if (result != 0) {
+                    break;
+                }
+            }
+
+            iteration = MRReasoningUtils.getCurrentIteration(conf);
+            if (!reportStats) {
+                System.out.println("Iteration " + iteration + ":");
+                System.out.println("\t" + newInstance + " new instance triples (" +
+                    usefulOutput + " useful for reasoning)");
+                System.out.println("\t" + newSchema + " new schema triples");
+                System.out.println("\t" + newInconsistencies + " new inconsistencies");
+            }
+            if (newInconsistencies + newInstance + newSchema > 0) {
+                findings = true;
+            }
+            numInconsistencies += newInconsistencies;
+            // Repeat if we're still generating information
+            productive = usefulOutput + newSchema > 0;
+        }
+
+        // Generate final output, if appropriate
+        if (result == 0 && findings && MRReasoningUtils.shouldOutput(conf)) {
+            OutputTool out = new OutputTool();
+            result = ToolRunner.run(conf, out, args);
+            runStats.collect(out, "OutputTool");
+        }
+
+        // Clean up intermediate data, if appropriate
+        MRReasoningUtils.clean(conf);
+
+        // Print stats, if specified
+        if (reportStats) {
+            System.out.println(runStats.report());
+        }
+
+        return result;
+    }
+
+    /**
+     * True if we've detected at least one inconsistency.
+    */
+    boolean hasInconsistencies() {
+        return numInconsistencies > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java
new file mode 100644
index 0000000..986d8a2
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java
@@ -0,0 +1,145 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.openrdf.model.Resource;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+/**
+ * Allows us to use a URI or bnode for a key.
+ */
+public class ResourceWritable implements WritableComparable<ResourceWritable> {
+    private Resource val;
+    private int key = 0; // Allows for secondary sort
+
+    public Resource get() {
+        return val;
+    }
+
+    public void set(Resource val) {
+        this.val = val;
+    }
+
+    public void set(Resource val, int sortKey) {
+        this.val = val;
+        this.key = sortKey;
+    }
+
+    public void setSortKey(int key) {
+        this.key = key;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        if (val == null) {
+            out.writeUTF("");
+        }
+        else {
+            out.writeUTF(val.toString());
+        }
+        out.writeInt(key);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        String s = in.readUTF();
+        if (s.length() > 0) {
+            if (s.startsWith("_")) {
+                val = ValueFactoryImpl.getInstance().createBNode(s.substring(2));
+            }
+            else {
+                val = ValueFactoryImpl.getInstance().createURI(s);
+            }
+        }
+        key = in.readInt();
+    }
+
+    @Override
+    public int compareTo(ResourceWritable other) {
+        return val.stringValue().compareTo(other.val.stringValue());
+    }
+
+    @Override
+    public String toString() {
+        return "<" + val.stringValue() + ">";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        else if (o == null || this.getClass() != o.getClass()) {
+            return false;
+        }
+        ResourceWritable other = (ResourceWritable) o;
+        if (this.val == null) {
+            return other.val == null;
+        }
+        else if (other.val == null) {
+            return false;
+        }
+        else if (this.val.stringValue() == null) {
+            return other.val.stringValue() == null;
+        }
+        else {
+            return this.val.stringValue().equals(other.val.stringValue());
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return val != null ? val.stringValue().hashCode() : 0;
+    }
+
+    public static class PrimaryComparator extends WritableComparator {
+        PrimaryComparator() {
+            super(ResourceWritable.class, true);
+        }
+        @Override
+        public int compare(WritableComparable wc1, WritableComparable wc2) {
+            ResourceWritable node1 = (ResourceWritable) wc1;
+            ResourceWritable node2 = (ResourceWritable) wc2;
+            return node1.compareTo(node2);
+        }
+    }
+
+    public static class SecondaryComparator extends WritableComparator {
+        SecondaryComparator() {
+            super(ResourceWritable.class, true);
+        }
+        @Override
+        public int compare(WritableComparable wc1, WritableComparable wc2) {
+            ResourceWritable node1 = (ResourceWritable) wc1;
+            ResourceWritable node2 = (ResourceWritable) wc2;
+            int result = node1.compareTo(node2);
+            if (result == 0) {
+                result = node1.key - node2.key;
+            }
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java
new file mode 100644
index 0000000..38fbe60
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java
@@ -0,0 +1,253 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskCounter;
+
+/**
+ * Collect and report a variety of statistics for a run. Prints MapReduce
+ * metrics for each individual job, and overall totals, using Hadoop counters.
+ */
+public class RunStatistics {
+    static Map<Stat, TaskCounter> taskCounters = new HashMap<>();
+    static Map<Stat, JobCounter> jobCounters = new HashMap<>();
+
+    private enum Stat {
+        TABLE("tableName"),
+        RUN("run"),
+        ITERATION("iteration"),
+        JOB("job"),
+
+        ELAPSED_TIME("elapsed (ms)"),
+
+        TBOX_IN("tbox in"),
+        ABOX_IN("abox in"),
+        INCONSISTENCIES_OUT("inconsistencies out"),
+        TRIPLES_OUT("triples out"),
+
+        MAP_INPUT_RECORDS("map input records", TaskCounter.MAP_INPUT_RECORDS),
+        MAP_OUTPUT_RECORDS("map output records", TaskCounter.MAP_OUTPUT_RECORDS),
+        REDUCE_INPUT_GROUPS("reduce input groups", TaskCounter.REDUCE_INPUT_GROUPS),
+
+        MAPS("maps", JobCounter.TOTAL_LAUNCHED_MAPS),
+        REDUCES("reduces", JobCounter.TOTAL_LAUNCHED_REDUCES),
+        MAP_TIME("map time (ms)", JobCounter.MILLIS_MAPS),
+
+        REDUCE_TIME("reduce time (ms)", JobCounter.MILLIS_REDUCES),
+        MAP_TIME_VCORES("map time (ms) * cores", JobCounter.VCORES_MILLIS_MAPS),
+        REDUCE_TIME_VCORES("reduce time (ms) * cores", JobCounter.VCORES_MILLIS_REDUCES),
+
+        GC_TIME("gc time (ms)", TaskCounter.GC_TIME_MILLIS),
+        CPU_TIME("total cpu time (ms)", TaskCounter.CPU_MILLISECONDS),
+
+        MAP_TIME_MB("map time (ms) * memory (mb)", JobCounter.MB_MILLIS_MAPS),
+        REDUCE_TIME_MB("reduce time (ms) * memory (mb)", JobCounter.MB_MILLIS_REDUCES),
+        PHYSICAL_MEMORY_BYTES("physical memory (bytes)", TaskCounter.PHYSICAL_MEMORY_BYTES),
+        VIRTUAL_MEMORY_BYTES("virtual memory (bytes)", TaskCounter.VIRTUAL_MEMORY_BYTES),
+
+        DATA_LOCAL_MAPS("data-local maps", JobCounter.DATA_LOCAL_MAPS),
+        MAP_OUTPUT_BYTES("map output bytes", TaskCounter.MAP_OUTPUT_BYTES),
+
+        FILE_BYTES_READ("file bytes read"),
+        HDFS_BYTES_READ("hdfs bytes read"),
+        FILE_BYTES_WRITTEN("file bytes written"),
+        HDFS_BYTES_WRITTEN("hdfs bytes written"),
+
+        FRACTION_TIME_GC("proportion time in gc"),
+        FRACTION_MEMORY_USAGE("proportion allocated memory used"),
+        FRACTION_CPU_USAGE("proportion allocated cpu used");
+
+        String name;
+        Stat(String name) {
+            this.name = name;
+        }
+        Stat(String key, JobCounter jc) {
+            this.name = key;
+            jobCounters.put(this, jc);
+        }
+        Stat(String key, TaskCounter tc) {
+            this.name = key;
+            taskCounters.put(this, tc);
+        }
+    }
+
+    private class JobResult {
+        Map<Stat, String> info = new HashMap<>();
+        Map<Stat, Long> stats = new HashMap<>();
+
+        void add(JobResult other) {
+            for (Stat key : other.stats.keySet()) {
+                if (this.stats.containsKey(key)) {
+                    stats.put(key, this.stats.get(key) + other.stats.get(key));
+                }
+                else {
+                    stats.put(key, other.stats.get(key));
+                }
+            }
+        }
+
+        void computeMetrics() {
+            long t = stats.get(Stat.MAP_TIME) + stats.get(Stat.REDUCE_TIME);
+            long b = stats.get(Stat.PHYSICAL_MEMORY_BYTES);
+            long timeMbAllocated = stats.get(Stat.MAP_TIME_MB) + stats.get(Stat.REDUCE_TIME_MB);
+            long timeVcores = stats.get(Stat.MAP_TIME_VCORES) + stats.get(Stat.REDUCE_TIME_VCORES);
+            long gcTime = stats.get(Stat.GC_TIME);
+            long cpuTime = stats.get(Stat.CPU_TIME);
+            long tasks = stats.get(Stat.MAPS) + stats.get(Stat.REDUCES);
+            double mb = b / 1024.0 / 1024.0;
+            double avgMb = mb / tasks;
+            double timeMbUsed = t * avgMb;
+            info.put(Stat.FRACTION_TIME_GC, String.valueOf((double) gcTime / t));
+            info.put(Stat.FRACTION_MEMORY_USAGE, String.valueOf(timeMbUsed / timeMbAllocated));
+            info.put(Stat.FRACTION_CPU_USAGE, String.valueOf((double) cpuTime / timeVcores));
+        }
+
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < Stat.values().length; i++) {
+                Stat s = Stat.values()[i];
+                if (i > 0) {
+                    sb.append(",");
+                }
+                if (info.containsKey(s)) {
+                    sb.append(info.get(s));
+                }
+                else if (stats.containsKey(s)) {
+                    sb.append(stats.get(s));
+                }
+                else {
+                    sb.append("--");
+                }
+            }
+            return sb.toString();
+        }
+    }
+
+    List<JobResult> jobResults = new LinkedList<>();
+    Map<String, JobResult> jobTypeResults = new HashMap<>();
+    JobResult totals = new JobResult();
+
+    String runId;
+    String tableName;
+
+    /**
+     * Instantiate a RunStatistics object with respect to an overall run
+     * (which can consist of many jobs). Runs are identified by their first
+     * job.
+     * @param   tableName   Name of the input table
+     */
+    RunStatistics(String tableName) {
+        this.tableName = tableName;
+        totals.info.put(Stat.TABLE, tableName);
+        totals.info.put(Stat.ITERATION, "total");
+        totals.info.put(Stat.JOB, "all");
+    }
+
+    /**
+     * Collect all the statistics we're interested in for a single job.
+     * @param   jobType     Name of job type (ForwardChain, etc.)
+     */
+    void collect(AbstractReasoningTool tool, String name) throws IOException,
+            InterruptedException {
+        // ID is ID of the first job run
+        if (runId == null) {
+            runId = tool.getJobID().toString();
+            totals.info.put(Stat.RUN, runId);
+        }
+        JobResult jobValues = new JobResult();
+        jobValues.info.put(Stat.TABLE, tableName);
+        jobValues.info.put(Stat.RUN, runId);
+        jobValues.info.put(Stat.ITERATION, String.valueOf(tool.getIteration()));
+        jobValues.info.put(Stat.JOB, name);
+
+        jobValues.stats.put(Stat.ELAPSED_TIME, tool.getElapsedTime());
+        for (Stat key : taskCounters.keySet()) {
+            jobValues.stats.put(key, tool.getCounter(taskCounters.get(key)));
+        }
+        for (Stat key : jobCounters.keySet()) {
+            jobValues.stats.put(key, tool.getCounter(jobCounters.get(key)));
+        }
+        jobValues.stats.put(Stat.TBOX_IN, tool.getNumSchemaInput());
+        jobValues.stats.put(Stat.ABOX_IN, tool.getNumInstanceInput());
+        jobValues.stats.put(Stat.INCONSISTENCIES_OUT,
+            tool.getNumInconsistencies());
+        jobValues.stats.put(Stat.TRIPLES_OUT, tool.getNumSchemaTriples()
+            + tool.getNumInstanceTriples());
+        jobValues.stats.put(Stat.FILE_BYTES_READ, tool.getCounter(
+            FileSystemCounter.class.getName(), "FILE_BYTES_READ"));
+        jobValues.stats.put(Stat.FILE_BYTES_WRITTEN, tool.getCounter(
+            FileSystemCounter.class.getName(), "FILE_BYTES_WRITTEN"));
+        jobValues.stats.put(Stat.HDFS_BYTES_READ, tool.getCounter(
+            FileSystemCounter.class.getName(), "HDFS_BYTES_READ"));
+        jobValues.stats.put(Stat.HDFS_BYTES_WRITTEN, tool.getCounter(
+            FileSystemCounter.class.getName(), "HDFS_BYTES_WRITTEN"));
+        jobResults.add(jobValues);
+        // Add to the running total for this job type (initialize if needed)
+        if (!jobTypeResults.containsKey(name)) {
+            JobResult typeResult = new JobResult();
+            typeResult.info.put(Stat.TABLE, tableName);
+            typeResult.info.put(Stat.RUN, runId);
+            typeResult.info.put(Stat.ITERATION, "total");
+            typeResult.info.put(Stat.JOB, name);
+            jobTypeResults.put(name, typeResult);
+        }
+        jobTypeResults.get(name).add(jobValues);
+        totals.add(jobValues);
+    }
+
+    /**
+     * Report statistics for all jobs.
+     */
+    String report() {
+        StringBuilder sb = new StringBuilder();
+        // Header
+        for (int i = 0; i < Stat.values().length; i++) {
+            if (i > 0) {
+                sb.append(",");
+            }
+            sb.append(Stat.values()[i].name);
+        }
+        // One line per job
+        for (JobResult result : jobResults) {
+            result.computeMetrics();
+            sb.append("\n").append(result);
+        }
+        // Include aggregates for jobs and overall
+        if (jobTypeResults.containsKey("ForwardChain")) {
+            jobTypeResults.get("ForwardChain").computeMetrics();
+            sb.append("\n").append(jobTypeResults.get("ForwardChain"));
+        }
+        if (jobTypeResults.containsKey("DuplicateElimination")) {
+            jobTypeResults.get("DuplicateElimination").computeMetrics();
+            sb.append("\n").append(jobTypeResults.get("DuplicateElimination"));
+        }
+        totals.computeMetrics();
+        sb.append("\n").append(totals);
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java
new file mode 100644
index 0000000..3455e8e
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java
@@ -0,0 +1,165 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * Collects the schema information stored in the table and outputs the schema
+ * (TBox) to a file.
+ */
+public class SchemaFilter extends AbstractReasoningTool {
+    @Override
+    protected void configureReasoningJob(String[] args) throws Exception {
+        configureMultipleInput(SchemaTableMapper.class, SchemaRdfMapper.class,
+            SchemaFileMapper.class, true);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(Fact.class);
+        job.setReducerClass(SchemaFilterReducer.class);
+        job.setNumReduceTasks(1);
+        configureSchemaOutput();
+    }
+
+    public static void main(String[] args) throws Exception {
+        System.exit(ToolRunner.run(new SchemaFilter(), args));
+    }
+
+    public static class SchemaTableMapper extends Mapper<Key, Value,
+            NullWritable, Fact> {
+        private Fact fact = new Fact();
+        /**
+         * Output a triple if it is schema information.
+         */
+        @Override
+        public void map(Key row, Value data, Context context)
+                throws IOException, InterruptedException {
+            fact.setTriple(MRReasoningUtils.getStatement(row, data,
+                context.getConfiguration()));
+            boolean isSchemaTriple = Schema.isSchemaTriple(fact.getTriple());
+            if (isSchemaTriple) {
+                context.write(NullWritable.get(), fact);
+            }
+            countInput(isSchemaTriple, context);
+        }
+    }
+
+    public static class SchemaFileMapper extends Mapper<Fact,
+            NullWritable, NullWritable, Fact> {
+        /**
+         * For a given fact, output it if it's a schema triple.
+         */
+        @Override
+        public void map(Fact fact, NullWritable nw, Context context)
+            throws IOException, InterruptedException {
+            if (Schema.isSchemaTriple(fact.getTriple())) {
+                context.write(NullWritable.get(), fact);
+            }
+        }
+    }
+
+    public static class SchemaRdfMapper extends Mapper<LongWritable,
+            RyaStatementWritable, NullWritable, Fact> {
+        private Fact fact = new Fact();
+        /**
+         * For a given fact, output it if it's a schema triple.
+         */
+        @Override
+        public void map(LongWritable key, RyaStatementWritable rsw, Context context)
+            throws IOException, InterruptedException {
+            fact.setTriple(rsw.getRyaStatement());
+            boolean isSchemaTriple = Schema.isSchemaTriple(fact.getTriple());
+            if (isSchemaTriple) {
+                context.write(NullWritable.get(), fact);
+            }
+            countInput(isSchemaTriple, context);
+        }
+    }
+
+    public static class SchemaFilterReducer extends Reducer<NullWritable,
+            Fact, NullWritable, SchemaWritable> {
+        private SchemaWritable schema;
+        private Logger log = Logger.getLogger(SchemaFilterReducer.class);
+        private static int LOG_INTERVAL = 1000;
+        private boolean debug = false;
+        private MultipleOutputs<?, ?> debugOut;
+        private Text debugKey = new Text();
+        private Text debugValue = new Text();
+
+        @Override
+        protected void setup(Context context) {
+            schema = new SchemaWritable();
+            debug = MRReasoningUtils.debug(context.getConfiguration());
+            debugOut = new MultipleOutputs<>(context);
+        }
+
+        /**
+         * Collect all schema information into a Schema object, use it to derive
+         * as much additional schema information as we can, and serialize it to
+         * an HDFS file.
+         */
+        @Override
+        protected void reduce(NullWritable key, Iterable<Fact> triples,
+                Context context) throws IOException, InterruptedException {
+            long count = 0;
+            for (Fact fact : triples) {
+                schema.processTriple(fact.getTriple());
+                count++;
+                if (count % LOG_INTERVAL == 0) {
+                    log.debug("After " + count + " schema triples...");
+                    log.debug(schema.getSummary());
+                }
+                if (debug) {
+                    debugKey.set("SCHEMA TRIPLE " + count);
+                    debugValue.set(fact.explain(false));
+                    debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey, debugValue);
+                }
+            }
+            log.debug("Total: " + count + " schema triples");
+            log.debug(schema.getSummary());
+        }
+
+        @Override
+        protected void cleanup(Context context) throws IOException,
+                InterruptedException {
+            if (debugOut != null) {
+                debugOut.close();
+            }
+            // Perform schema-level reasoning
+            schema.closure();
+            // Output the complete schema
+            context.write(NullWritable.get(), schema);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java
new file mode 100644
index 0000000..c423a59
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java
@@ -0,0 +1,76 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+
+import mvm.rya.reasoning.OwlClass;
+import mvm.rya.reasoning.OwlProperty;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.hadoop.io.Writable;
+
+public class SchemaWritable extends Schema implements Writable {
+    @Override
+    public void write(DataOutput out) throws IOException {
+        ArrayList<OwlProperty> propList = new ArrayList<>(properties.values());
+        ArrayList<OwlClass> classList = new ArrayList<>(classes.values());
+        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+        ObjectOutputStream stream = new ObjectOutputStream(bytes);
+        stream.writeObject(propList);
+        stream.writeObject(classList);
+        byte[] arr = bytes.toByteArray();
+        stream.close();
+        out.writeInt(arr.length);
+        out.write(arr);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        int size = in.readInt();
+        byte[] bytes = new byte[size];
+        in.readFully(bytes);
+        ObjectInputStream stream = new ObjectInputStream(
+            new ByteArrayInputStream(bytes));
+        try {
+            Iterable<?> propList = (Iterable<?>) stream.readObject();
+            Iterable<?> classList = (Iterable<?>) stream.readObject();
+            for (Object p : propList) {
+                OwlProperty prop = (OwlProperty) p;
+                properties.put(prop.getURI(), prop);
+            }
+            for (Object c : classList) {
+                OwlClass owlClass = (OwlClass) c;
+                classes.put(owlClass.getURI(), owlClass);
+            }
+        }
+        catch (ClassNotFoundException e) {
+            e.printStackTrace();
+        }
+        stream.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java b/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java
new file mode 100644
index 0000000..2084e05
--- /dev/null
+++ b/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java
@@ -0,0 +1,512 @@
+package mvm.rya.reasoning;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.SKOS;
+
+public class LocalReasonerTest {
+    private LocalReasoner reasoner;
+    private Schema schema;
+
+    /**
+     * Load in a small schema to use in instance reasoning
+     */
+    @Before
+    public void loadSchema() {
+        schema = new Schema();
+        reasoner = new LocalReasoner(TestUtils.NODE, schema, 1, 0);
+    }
+
+    /**
+     * cax-sco
+     */
+    @Test
+    public void testInferSuperclass() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("Professor"),
+            RDFS.SUBCLASSOF, TestUtils.uri("Faculty")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            TestUtils.uri("Professor")));
+        Assert.assertTrue("Type not derived from subclass",
+            reasoner.types.knownTypes.containsKey(TestUtils.uri("Faculty")));
+    }
+
+    /**
+     * prp-dom
+     */
+    @Test
+    public void testInferDomain() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("hasAlumnus"),
+            RDFS.DOMAIN, TestUtils.uri("University")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("hasAlumnus"), TestUtils.uri("John Doe")));
+        Assert.assertTrue("Type not derived from rdfs:domain",
+            reasoner.types.knownTypes.containsKey(TestUtils.uri("University")));
+    }
+
+    /**
+     * prp-rng
+     */
+    @Test
+    public void testInferRange() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("advisor"),
+            RDFS.RANGE, TestUtils.uri("Professor")));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("John Doe"),
+            TestUtils.uri("advisor"), TestUtils.NODE));
+        Assert.assertTrue("Type not derived from rdfs:range",
+            reasoner.types.knownTypes.containsKey(TestUtils.uri("Professor")));
+    }
+
+    /**
+     * cls-nothing2
+     */
+    @Test
+    public void testNothingInconsistent() throws Exception {
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, OWL.NOTHING));
+        Assert.assertTrue("rdf:type owl:Nothing should be inconsistent",
+            reasoner.hasInconsistencies());
+    }
+
+    /**
+     * prp-inv1
+     */
+    @Test
+    public void testInverseProperty1() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("memberOf"),
+            OWL.INVERSEOF, TestUtils.uri("member")));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+            TestUtils.uri("memberOf"), TestUtils.NODE));
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.NODE)
+                && t.getPredicate().equals(TestUtils.uri("member"))
+                && t.getObject().equals(TestUtils.uri("y"))) {
+                return;
+            }
+        }
+        Assert.fail("Should have derived inverse triple");
+    }
+
+    /**
+     * prp-inv2
+     */
+    @Test
+    public void testInverseProperty2() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("memberOf"),
+            OWL.INVERSEOF, TestUtils.uri("member")));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+            TestUtils.uri("member"), TestUtils.NODE));
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.NODE)
+                && t.getPredicate().equals(TestUtils.uri("memberOf"))
+                && t.getObject().equals(TestUtils.uri("y"))) {
+                return;
+            }
+        }
+        Assert.fail("Should have derived inverse triple");
+    }
+
+    /**
+     * prp-spo1
+     */
+    @Test
+    public void testInferSuperproperty() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("headOf"),
+            RDFS.SUBPROPERTYOF, TestUtils.uri("worksFor")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("headOf"), TestUtils.uri("org")));
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.NODE)
+                && t.getPredicate().equals(TestUtils.uri("worksFor"))
+                && t.getObject().equals(TestUtils.uri("org"))) {
+                return;
+            }
+        }
+        Assert.fail("Superproperty not inferred from subproperty");
+    }
+
+    /**
+     * prp-symp
+     */
+    @Test
+    public void testSymmetry() throws Exception {
+        schema.processTriple(TestUtils.statement(SKOS.RELATED, RDF.TYPE,
+            OWL.SYMMETRICPROPERTY));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("y"), SKOS.RELATED,
+            TestUtils.NODE));
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.NODE)
+                && t.getPredicate().equals(SKOS.RELATED)
+                && t.getObject().equals(TestUtils.uri("y"))) {
+                return;
+            }
+        }
+        Assert.fail("Symmetric property not inferred");
+    }
+
+    /**
+     * cls-com
+     */
+    @Test
+    public void testComplementaryClasses() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("A"),
+            OWL.COMPLEMENTOF, TestUtils.uri("NotA")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            TestUtils.uri("A")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            TestUtils.uri("NotA")));
+        Assert.assertTrue("Complementary class membership not detected",
+            reasoner.hasInconsistencies());
+    }
+
+    /**
+     * cax-dw
+     */
+    @Test
+    public void testDisjointClasses() throws Exception {
+        schema.processTriple(TestUtils.statement(SKOS.CONCEPT, OWL.DISJOINTWITH,
+            SKOS.COLLECTION));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            SKOS.CONCEPT));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            SKOS.COLLECTION));
+        Assert.assertTrue("Disjoint class membership not detected",
+            reasoner.hasInconsistencies());
+    }
+
+    /**
+     * prp-trp
+     */
+    @Test
+    public void testTransitivePropertyIncomingOutgoing() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("subOrganizationOf"),
+            RDF.TYPE, OWL.TRANSITIVEPROPERTY));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+            TestUtils.uri("subOrganizationOf"), TestUtils.NODE));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("subOrganizationOf"), TestUtils.uri("z")));
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.uri("y"))
+                && t.getPredicate().equals(TestUtils.uri("subOrganizationOf"))
+                && t.getObject().equals(TestUtils.uri("z"))) {
+                return;
+            }
+        }
+        Assert.fail("Transitive relation not inferred (received incoming edge first)");
+    }
+    @Test
+    public void testTransitivePropertyOutgoingIncoming() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("subOrganizationOf"),
+            RDF.TYPE, OWL.TRANSITIVEPROPERTY));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("subOrganizationOf"), TestUtils.uri("z")));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+            TestUtils.uri("subOrganizationOf"), TestUtils.NODE));
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.uri("y"))
+                && t.getPredicate().equals(TestUtils.uri("subOrganizationOf"))
+                && t.getObject().equals(TestUtils.uri("z"))) {
+                Assert.fail("Transitive relation should not be inferred "
+                + "(received outgoing edge first)");
+            }
+        }
+    }
+
+    /**
+     * prp-irp
+     */
+    @Test
+    public void testIrreflexiveProperty() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("hasParent"),
+            RDF.TYPE, OWL2.IRREFLEXIVEPROPERTY));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("hasParent"), TestUtils.NODE));
+        Assert.assertTrue("Relation to self via irreflexive property not detected",
+            reasoner.hasInconsistencies());
+    }
+
+    /**
+     * prp-asyp
+     */
+    @Test
+    public void testAsymmetricPropertyIncomingOutgoing() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("hasChild"),
+            RDF.TYPE, OWL2.ASYMMETRICPROPERTY));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+            TestUtils.uri("hasChild"), TestUtils.NODE));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("hasChild"), TestUtils.uri("y")));
+        Assert.assertTrue("Symmetric relation with asymmetric property not detected"
+            + " (receiving incoming edge first)", reasoner.hasInconsistencies());
+    }
+    @Test
+    public void testAsymmetricPropertyReverseOutgoingIncoming() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("hasChild"), RDF.TYPE,
+            OWL2.ASYMMETRICPROPERTY));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("hasChild"), TestUtils.uri("y")));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("y"),
+            TestUtils.uri("hasChild"), TestUtils.NODE));
+        Assert.assertFalse("Symmetric relation with asymmetric property should"
+            + " not be detected when receiving outgoing edge first",
+            reasoner.hasInconsistencies());
+    }
+    @Test
+    public void testAsymmetricPropertyReflexive() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("hasChild"),
+            RDF.TYPE, OWL2.ASYMMETRICPROPERTY));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("hasChild"), TestUtils.NODE));
+        Assert.assertTrue("Self-referential relation with asymmetric property"
+            + " not detected", reasoner.hasInconsistencies());
+    }
+
+    /**
+     * prp-pdw
+     */
+    @Test
+    public void testDisjointProperties() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("p1"),
+            OWL2.PROPERTYDISJOINTWITH, TestUtils.uri("p2")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("p1"), TestUtils.uri("y")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("p2"), TestUtils.uri("y")));
+        Assert.assertTrue("Disjoint property usage not detected (left-hand side"
+            + " of propertyDisjointWith received first)", reasoner.hasInconsistencies());
+    }
+    @Test
+    public void testDisjointPropertiesReverse() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("p2"),
+            OWL2.PROPERTYDISJOINTWITH, TestUtils.uri("p1")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("p1"), TestUtils.uri("y")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("p2"), TestUtils.uri("y")));
+        Assert.assertTrue("Disjoint property usage not detected (right-hand side"
+            + " of propertyDisjointWith received first)", reasoner.hasInconsistencies());
+    }
+
+    /**
+     * cls-svf1
+     */
+    @Test
+    public void testSomeValuesFromMembership() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.SOMEVALUESFROM, TestUtils.uri("Department")));
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.ONPROPERTY, TestUtils.uri("headOf")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            TestUtils.uri("Department")));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("John Doe"),
+            TestUtils.uri("headOf"), TestUtils.NODE));
+        reasoner.getTypes();
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.uri("John Doe"))
+                && t.getPredicate().equals(RDF.TYPE)
+                && t.getObject().equals(TestUtils.uri("x"))) {
+                return;
+            }
+        }
+        Assert.fail("If x is a property restriction [owl:someValuesFrom c]"
+            + " on property p;  (z type c) then (y p z) should imply"
+            + " (y type x).");
+    }
+    @Test
+    public void testSomeValuesFromMembershipReverse() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.SOMEVALUESFROM, TestUtils.uri("Department")));
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.ONPROPERTY, TestUtils.uri("headOf")));
+        reasoner.processFact(TestUtils.fact(TestUtils.uri("John Doe"),
+            TestUtils.uri("headOf"), TestUtils.NODE));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            TestUtils.uri("Department")));
+        reasoner.getTypes();
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.uri("John Doe"))
+                && t.getPredicate().equals(RDF.TYPE)
+                && t.getObject().equals(TestUtils.uri("x"))) {
+                return;
+            }
+        }
+        Assert.fail("If x is a property restriction [owl:someValuesFrom c]"
+            + " on property p; (y p z) then (z type c) should imply"
+            + " (y type x).");
+    }
+
+    /**
+     * cls-svf2
+     */
+    @Test
+    public void testSomeValuesFromThing() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.SOMEVALUESFROM, OWL.THING));
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.ONPROPERTY, TestUtils.uri("foo")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("foo"), TestUtils.uri("bar")));
+        reasoner.getTypes();
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.NODE)
+                && t.getPredicate().equals(RDF.TYPE)
+                && t.getObject().equals(TestUtils.uri("x"))) {
+                return;
+            }
+        }
+        Assert.fail("If x is a property restriction [owl:someValuesFrom owl:Thing]"
+            + " on property p; (y p z) should imply (y type x) for any z.");
+    }
+
+    /**
+     * cls-hv1
+     */
+    @Test
+    public void testHasValueInferValue() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"), OWL.HASVALUE,
+            TestUtils.uri("y")));
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.ONPROPERTY, TestUtils.uri("p")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            TestUtils.uri("x")));
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.NODE)
+                && t.getPredicate().equals(TestUtils.uri("p"))
+                && t.getObject().equals(TestUtils.uri("y"))) {
+                return;
+            }
+        }
+        Assert.fail("If x is a property restriction [owl:hasValue y]"
+            + " on property p; (u type x) should imply (u p y) for any u.");
+    }
+
+    /**
+     * cls-hv2
+     */
+    @Test
+    public void testHasValueInferClass() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"), OWL.HASVALUE,
+            TestUtils.uri("y")));
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.ONPROPERTY, TestUtils.uri("p")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("p"), TestUtils.uri("y")));
+        reasoner.getTypes();
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.NODE)
+                && t.getPredicate().equals(RDF.TYPE)
+                && t.getObject().equals(TestUtils.uri("x"))) {
+                return;
+            }
+        }
+        Assert.fail("If x is a property restriction [owl:hasValue y]"
+            + " on property p; (u p y) should imply (u type x) for any u.");
+    }
+
+
+    /**
+     * cls-avf
+     */
+    @Test
+    public void testAllValuesFrom() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.ALLVALUESFROM, TestUtils.uri("Human")));
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.ONPROPERTY, TestUtils.uri("hasParent")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("hasParent"), TestUtils.uri("v")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            TestUtils.uri("x")));
+        reasoner.getTypes();
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.uri("v"))
+                && t.getPredicate().equals(RDF.TYPE)
+                && t.getObject().equals(TestUtils.uri("Human"))) {
+                return;
+            }
+        }
+        Assert.fail("If x is a property restriction [owl:allValuesFrom c]"
+            + " on property p; (u p v) then (u type x) should imply (v type c).");
+    }
+    @Test
+    public void testAllValuesFromReverse() throws Exception {
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.ALLVALUESFROM, TestUtils.uri("Human")));
+        schema.processTriple(TestUtils.statement(TestUtils.uri("x"),
+            OWL.ONPROPERTY, TestUtils.uri("hasParent")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE,
+            TestUtils.uri("x")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE,
+            TestUtils.uri("hasParent"), TestUtils.uri("v")));
+        reasoner.getTypes();
+        for (Fact t : reasoner.getFacts()) {
+            if (t.getSubject().equals(TestUtils.uri("v"))
+                && t.getPredicate().equals(RDF.TYPE)
+                && t.getObject().equals(TestUtils.uri("Human"))) {
+                return;
+            }
+        }
+        Assert.fail("If x is a property restriction [owl:allValuesFrom c]"
+            + " on property p; (u type x) then (u p v) should imply (v type c).");
+    }
+
+    /**
+     * cls-maxc1
+     */
+    @Test
+    public void testMaxCardinalityZero() throws Exception {
+        URI r = TestUtils.uri("restriction");
+        URI p = TestUtils.uri("impossiblePredicate1");
+        schema.processTriple(TestUtils.statement(r, OWL.MAXCARDINALITY,
+            TestUtils.intLiteral("0")));
+        schema.processTriple(TestUtils.statement(r, OWL.ONPROPERTY, p));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, p,
+            TestUtils.uri("y")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, r));
+        Assert.assertTrue("If p has maxCardinality of 0; then any (x p y)"
+            + " should be found inconsistent", reasoner.hasInconsistencies());
+    }
+
+    /**
+     * cls-maxqc2
+     */
+    @Test
+    public void testMaxQCardinalityZeroThings() throws Exception {
+        Resource r = TestUtils.bnode("restriction");
+        URI p = TestUtils.uri("impossiblePredicate2");
+        schema.processTriple(TestUtils.statement(r, OWL2.MAXQUALIFIEDCARDINALITY,
+            TestUtils.intLiteral("0")));
+        schema.processTriple(TestUtils.statement(r, OWL.ONPROPERTY, p));
+        schema.processTriple(TestUtils.statement(r, OWL2.ONCLASS, OWL.THING));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, p,
+            TestUtils.uri("y")));
+        reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, r));
+        Assert.assertTrue("If p has maxQualifiedCardinality of 0 on owl:Thing;"
+            + " then any (x p y) should be found inconsistent",
+            reasoner.hasInconsistencies());
+    }
+}