You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/06/21 17:11:42 UTC

[3/5] incubator-rya git commit: Included project rya.reasoning.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java
new file mode 100644
index 0000000..510fc75
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java
@@ -0,0 +1,538 @@
+package mvm.rya.reasoning;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.model.vocabulary.OWL;
+
+/**
+ * Hold on to facts about the schema (TBox/RBox) and perform what reasoning we
+ * can without instance data.
+ * <p>
+ * The Schema object, together with the OwlClass and OwlProperty objects it
+ * keeps track of, is responsible for schema reasoning, or the "Semantics of
+ * Schema Vocabulary" rules from the OWL RL/RDF specificiation. Some rules are
+ * handled dynamically, while the rest must be computed by calling closure()
+ * once the schema data has been read in.
+ * <p>
+ * Schema rules implemented in {@link OwlClass}:
+ *      scm-cls, scm-eqc1, scm-eqc2, scm-sco,
+ *      scm-hv, scm-svf2, scm-avf2
+ * <p>
+ * Schema rules implemented in {@link OwlProperty}:
+ *      scm-op, scm-dp, scm-eqp1, scm-eqp2, scm-spo, scm-dom1, scm-dom2,
+ *      scm-rng1, scm-rng2, scm-svf1, scm-avf1
+ * <p>
+ * TODO: scm-cls officially states owl:Nothing is a subclass of every class.
+ *  Do we need to explicitly do something with this fact?
+ */
+public class Schema {
+    // Statements using these predicates are automatically relevant schema
+    // information.
+    private static final Set<URI> schemaPredicates = new HashSet<>();
+    private static final URI[] schemaPredicateURIs = {
+        RDFS.SUBCLASSOF,
+        RDFS.SUBPROPERTYOF,
+        RDFS.DOMAIN,
+        RDFS.RANGE,
+        OWL.EQUIVALENTCLASS,
+        OWL.EQUIVALENTPROPERTY,
+        OWL.INVERSEOF,
+        OWL.DISJOINTWITH,
+        OWL.COMPLEMENTOF,
+        OWL.ONPROPERTY,
+        OWL.SOMEVALUESFROM,
+        OWL.ALLVALUESFROM,
+        OWL.HASVALUE,
+        OWL.MAXCARDINALITY,
+        OWL2.MAXQUALIFIEDCARDINALITY,
+        OWL2.PROPERTYDISJOINTWITH,
+        OWL2.ONCLASS
+    };
+
+    // The fact that something is one of these types is schema information.
+    private static final Set<Resource> schemaTypes = new HashSet<>();
+    private static final Resource[] schemaTypeURIs = {
+        OWL.TRANSITIVEPROPERTY,
+        OWL2.IRREFLEXIVEPROPERTY,
+        OWL.SYMMETRICPROPERTY,
+        OWL2.ASYMMETRICPROPERTY,
+        OWL.FUNCTIONALPROPERTY,
+        OWL.INVERSEFUNCTIONALPROPERTY
+    };
+
+    static {
+        for (URI uri : schemaPredicateURIs) {
+            schemaPredicates.add(uri);
+        }
+        for (Resource uri : schemaTypeURIs) {
+            schemaTypes.add(uri);
+        }
+    }
+
+    /**
+     * Does this triple/statement encode potentially relevant schema
+     * information?
+     */
+    public static boolean isSchemaTriple(Statement triple) {
+        URI pred = triple.getPredicate();
+        // Triples with certain predicates are schema triples,
+        if (schemaPredicates.contains(pred)) {
+            return true;
+        }
+        // And certain type assertions are schema triples.
+        else if (pred.equals(RDF.TYPE)) {
+            if (schemaTypes.contains(triple.getObject())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Map URIs to schema information about a property
+     */
+    protected Map<URI, OwlProperty> properties = new HashMap<>();
+
+    /**
+     * Map Resources to schema information about a class/restriction
+     */
+    protected Map<Resource, OwlClass> classes = new HashMap<>();
+
+    /**
+     * Get schema information for a class, for reading and writing.
+     * Instantiates OwlClass if it doesn't yet exist.
+     */
+    public OwlClass getClass(Resource c) {
+        if (!classes.containsKey(c)) {
+            classes.put(c, new OwlClass(c));
+        }
+        return classes.get(c);
+    }
+
+    /**
+     * Get schema information for a class, for reading and writing.
+     * Assumes this Value refers to a class Resource.
+     */
+    public OwlClass getClass(Value c) {
+        return getClass((Resource) c);
+    }
+
+    /**
+     * Get schema information for a property, for reading and writing.
+     * Instantiates OwlProperty if it doesn't yet exist.
+     */
+    public OwlProperty getProperty(URI p) {
+        if (!properties.containsKey(p)) {
+            properties.put(p, new OwlProperty(p));
+        }
+        return properties.get(p);
+    }
+
+    /**
+     * Get schema information for a property, for reading and writing.
+     * Assumes this Value refers to a property URI.
+     */
+    public OwlProperty getProperty(Value p) {
+        return getProperty((URI) p);
+    }
+
+    /**
+     * Return whether this resource corresponds to a property.
+     */
+    public boolean hasProperty(URI r) {
+        return properties.containsKey(r);
+    }
+
+    /**
+     * Return whether this resource corresponds to a class.
+     */
+    public boolean hasClass(Resource r) {
+        return classes.containsKey(r);
+    }
+
+    /**
+     * Return whether this resource corresponds to a property restriction.
+     */
+    public boolean hasRestriction(Resource r) {
+        return classes.containsKey(r) && !classes.get(r).getOnProperty().isEmpty();
+    }
+
+    public Schema() {
+    }
+
+    /**
+     * Incorporate a new triple into the schema.
+     */
+    public void processTriple(Statement triple) {
+        Resource s = triple.getSubject();
+        URI p = triple.getPredicate();
+        Value o = triple.getObject();
+        if (isSchemaTriple(triple)) {
+            // For a type statement to be schema information, it must yield
+            // some boolean information about a property.
+            if (p.equals(RDF.TYPE)) {
+                if (schemaTypes.contains(o)) {
+                    addPropertyType((URI) s, (Resource) o);
+                }
+            }
+
+            // Domain/range
+            else if (p.equals(RDFS.DOMAIN)) {
+                // Don't add trivial domain owl:Thing
+                if (!o.equals(OWL.THING)) {
+                    getProperty(s).addDomain(getClass(o));
+                }
+            }
+            else if (p.equals(RDFS.RANGE)) {
+                // Don't add trivial range owl:Thing
+                if (!o.equals(OWL.THING)) {
+                    getProperty(s).addRange(getClass(o));
+                }
+            }
+
+            // Sub/super relations
+            else if (p.equals(RDFS.SUBCLASSOF)) {
+                // Everything is a subclass of owl#Thing, we don't need to
+                // store that information
+                if (!o.equals(OWL.THING)) {
+                    getClass(s).addSuperClass(getClass(o));
+                }
+            }
+            else if (p.equals(RDFS.SUBPROPERTYOF)) {
+                getProperty(s).addSuperProperty(getProperty(o));
+            }
+
+            // Equivalence relations
+            else if (p.equals(OWL.EQUIVALENTCLASS)) {
+                getClass(s).addEquivalentClass(getClass(o));
+            }
+            else if (p.equals(OWL.EQUIVALENTPROPERTY)) {
+                getProperty(s).addEquivalentProperty(getProperty(o));
+            }
+
+            // Inverse properties
+            else if (p.equals(OWL.INVERSEOF)) {
+                getProperty(s).addInverse(getProperty(o));
+                getProperty(o).addInverse(getProperty(s));
+            }
+
+            // Complementary classes
+            else if (p.equals(OWL.COMPLEMENTOF)) {
+                getClass(s).addComplement(getClass(o));
+                getClass(o).addComplement(getClass(s));
+            }
+
+            // Disjoint classes and properties
+            else if (p.equals(OWL.DISJOINTWITH)) {
+                getClass(s).addDisjoint(getClass(o));
+                getClass(o).addDisjoint(getClass(s));
+            }
+            else if (p.equals(OWL2.PROPERTYDISJOINTWITH)) {
+                getProperty(s).addDisjoint(getProperty(o));
+                getProperty(o).addDisjoint(getProperty(s));
+            }
+
+            // Property restriction info
+            else if (p.equals(OWL.ONPROPERTY)) {
+                getClass(s).addProperty(getProperty(o));
+            }
+            else if (p.equals(OWL.SOMEVALUESFROM)) {
+                getClass(s).addSvf(getClass(o));
+            }
+            else if (p.equals(OWL.ALLVALUESFROM)) {
+                getClass(s).addAvf(getClass(o));
+            }
+            else if (p.equals(OWL2.ONCLASS)) {
+                getClass(s).addClass(getClass(o));
+            }
+            else if (p.equals(OWL.HASVALUE)) {
+                getClass(s).addValue(o);
+            }
+            else if (p.equals(OWL.MAXCARDINALITY)) {
+                getClass(s).setMaxCardinality(o);
+            }
+            else if (p.equals(OWL2.MAXQUALIFIEDCARDINALITY)) {
+                getClass(s).setMaxQualifiedCardinality(o);
+            }
+        }
+    }
+
+    /**
+     * Add a particular characteristic to a property.
+     */
+    private void addPropertyType(URI p, Resource t) {
+        OwlProperty prop = getProperty(p);
+        if (t.equals(OWL.TRANSITIVEPROPERTY)) {
+            prop.setTransitive();
+        }
+        else if (t.equals(OWL.SYMMETRICPROPERTY)) {
+            prop.setSymmetric();
+        }
+        else if (t.equals(OWL2.ASYMMETRICPROPERTY)) {
+            prop.setAsymmetric();
+        }
+        else if (t.equals(OWL.FUNCTIONALPROPERTY)) {
+            prop.setFunctional();
+        }
+        else if (t.equals(OWL.INVERSEFUNCTIONALPROPERTY)) {
+            prop.setInverseFunctional();
+        }
+        else if (t.equals(OWL2.IRREFLEXIVEPROPERTY)) {
+            prop.setIrreflexive();
+        }
+    }
+
+    /**
+     * Perform schema-level reasoning to compute the closure of statements
+     * already represented in this schema. This includes things like subClassOf
+     * transitivity and applying domain/range to subclasses.
+     */
+    public void closure() {
+        // RL rule scm-spo: subPropertyOf transitivity
+        // (takes in subproperty info; yields subproperty info)
+        for (OwlProperty subprop : properties.values()) {
+            subprop.computeSuperProperties();
+        }
+
+        // RL rules scm-hv, scm-svf2, scm-avf2: restrictions & subproperties
+        // (take in subproperty info & prop. restrictions; yield subclass info)
+        for (OwlClass c1 : classes.values()) {
+            for (OwlClass c2 : classes.values()) {
+                c1.compareRestrictions(c2);
+            }
+        }
+
+        // The following two steps can affect each other, so repeat the block
+        // as many times as necessary.
+        boolean repeat;
+        do {
+            // RL rule scm-sco: subClassOf transitivity
+            // (takes in subclass info; yields subclass info)
+            // (This traverses the complete hierarchy, so we don't need to loop
+            // again if changes are only made in this step)
+            for (OwlClass subclass : classes.values()) {
+                subclass.computeSuperClasses();
+            }
+            // RL rules scm-svf1, scm-avf1: property restrictions & subclasses
+            // (take in subclass info & prop. restrictions; yield subclass info)
+            // (If changes are made here, loop through both steps again)
+            repeat = false;
+            for (OwlProperty prop : properties.values()) {
+                repeat = prop.compareRestrictions() || repeat;
+            }
+        } while (repeat);
+
+        // Apply RL rules scm-dom1, scm-rng1, scm-dom2, scm-rng2:
+        // (take in subclass/subproperty & domain/range; yield domain/range)
+        for (OwlProperty prop : properties.values()) {
+            prop.inheritDomainRange();
+        }
+    }
+
+    /**
+     * Determine whether a fact is contained in the Schema object
+     * relationships or implied by schema rules.
+     * @return  True if this schema contains the semantics of the triple
+     */
+    public boolean containsTriple(Statement triple) {
+        // The schema certainly doesn't contain it if it's not a
+        // schema-relevant triple at all.
+        if (isSchemaTriple(triple)) {
+            Resource s = triple.getSubject();
+            URI p = triple.getPredicate();
+            Value o = triple.getObject();
+            // If this is telling us something about a property:
+            if (properties.containsKey(s)) {
+                OwlProperty prop = properties.get(s);
+                // Property types:
+                if (p.equals(RDF.TYPE)) {
+                    if ((o.equals(OWL.TRANSITIVEPROPERTY)
+                            && prop.isTransitive())
+                        || (o.equals(OWL2.IRREFLEXIVEPROPERTY)
+                            && prop.isIrreflexive())
+                        || (o.equals(OWL.SYMMETRICPROPERTY)
+                            && prop.isSymmetric())
+                        || (o.equals(OWL2.ASYMMETRICPROPERTY)
+                            && prop.isAsymmetric())
+                        || (o.equals(OWL.FUNCTIONALPROPERTY)
+                            && prop.isFunctional())
+                        || (o.equals(OWL.INVERSEFUNCTIONALPROPERTY)
+                            && prop.isInverseFunctional())) {
+                        return true;
+                    }
+                }
+                // Relationships with other properties:
+                if ((p.equals(RDFS.SUBPROPERTYOF)
+                        && prop.getSuperProperties().contains(o))
+                    || (p.equals(OWL2.PROPERTYDISJOINTWITH)
+                        && prop.getDisjointProperties().contains(o))
+                    || (p.equals(OWL.EQUIVALENTPROPERTY)
+                        && prop.getEquivalentProperties().contains(o))
+                    || (p.equals(OWL.INVERSEOF)
+                        && prop.getInverseProperties().contains(o))) {
+                    return true;
+                }
+                // Relationships with classes:
+                if ((p.equals(RDFS.DOMAIN)
+                        && prop.getDomain().contains(o))
+                    || (p.equals(RDFS.RANGE)
+                        && prop.getRange().contains(o))) {
+                    return true;
+                }
+            }
+            // If this is about a class relationship:
+            if (classes.containsKey(s)) {
+                OwlClass subject = classes.get(s);
+                if ((p.equals(OWL.EQUIVALENTCLASS)
+                        && (subject.getEquivalentClasses().contains(o)))
+                    || (p.equals(OWL.DISJOINTWITH)
+                        && (subject.getDisjointClasses().contains(o)))
+                    || (p.equals(OWL.COMPLEMENTOF)
+                        && (subject.getComplementaryClasses().contains(o)))
+                    || (p.equals(RDFS.SUBCLASSOF)
+                        && (subject.getSuperClasses().contains(o)))) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Collect and return counts of different kinds of schema constructs
+     */
+    public String getSummary() {
+        int nRestrictions = 0;
+        for (Resource r : classes.keySet()) {
+            OwlClass c = classes.get(r);
+            if (!c.getOnProperty().isEmpty()) {
+                nRestrictions++;
+            }
+        }
+        int nClasses = classes.size();
+        int nProperties = properties.size();
+        String[] pTypes = { "Transitive", "Symmetric", "Asymmetric",
+            "Functional", "Inverse Functional", "Irreflexive" };
+        String[] rTypes = { "someValuesFrom", "allValuesFrom", "hasValue",
+            "maxCardinality==0", "maxCardinality>0",
+            "maxQualifiedCardinality==0", "maxQualifiedCardinality>0", };
+        String[] edgeTypes = { "Superclass", "Disjoint class", "Complement",
+            "Superproperty", "Disjoint property", "Inverse property",
+            "Domain", "Range",
+            "Equivalent class", "Equivalent property"};
+        int[] pTotals = { 0, 0, 0, 0, 0, 0 };
+        int[] rTotals = { 0, 0, 0, 0, 0, 0, 0 };
+        int[] edgeTotals = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+        for (OwlClass c : classes.values()) {
+            edgeTotals[0] += c.getSuperClasses().size() - 2;
+            edgeTotals[1] += c.getDisjointClasses().size();
+            edgeTotals[2] += c.getComplementaryClasses().size();
+            edgeTotals[8] += c.getEquivalentClasses().size() - 1;
+            if (!c.someValuesFrom().isEmpty()) rTotals[0]++;
+            if (!c.allValuesFrom().isEmpty()) rTotals[1]++;
+            if (!c.hasValue().isEmpty()) rTotals[2]++;
+            if (c.getMaxCardinality() == 0) rTotals[3]++;
+            if (c.getMaxCardinality() > 0) rTotals[4]++;
+            if (c.getMaxQualifiedCardinality() == 0) rTotals[5]++;
+            if (c.getMaxQualifiedCardinality() > 0) rTotals[6]++;
+        }
+        for (OwlProperty p : properties.values()) {
+            if (p.isTransitive()) pTotals[0]++;
+            if (p.isSymmetric()) pTotals[1]++;
+            if (p.isAsymmetric()) pTotals[2]++;
+            if (p.isFunctional()) pTotals[3]++;
+            if (p.isInverseFunctional()) pTotals[4]++;
+            if (p.isIrreflexive()) pTotals[5]++;
+            edgeTotals[3] += p.getSuperProperties().size() - 1;
+            edgeTotals[4] += p.getDisjointProperties().size();
+            edgeTotals[5] += p.getInverseProperties().size();
+            edgeTotals[6] += p.getDomain().size();
+            edgeTotals[7] += p.getRange().size();
+            edgeTotals[9] += p.getEquivalentProperties().size();
+        }
+        StringBuilder sb = new StringBuilder();
+        sb.append("Schema summary:");
+        sb.append("\n\tClasses: " + nClasses);
+        sb.append("\n\t\tProperty Restrictions: ").append(nRestrictions);
+        for (int i = 0; i < rTypes.length; i++) {
+            sb.append("\n\t\t\t");
+            sb.append(rTypes[i]).append(": ").append(rTotals[i]);
+        }
+        sb.append("\n\t\tOther: ").append(nClasses-nRestrictions);
+        sb.append("\n\tProperties: ").append(nProperties);
+        for (int i = 0; i < pTypes.length; i++) {
+            sb.append("\n\t\t");
+            sb.append(pTypes[i]).append(": ").append(pTotals[i]);
+        }
+        sb.append("\n\tConnections:");
+        for (int i = 0; i < edgeTypes.length; i++) {
+            sb.append("\n\t\t");
+            sb.append(edgeTypes[i]).append(": ").append(edgeTotals[i]);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Assuming a given resource corresponds to a property restriction,
+     * describe the restriction.
+     */
+    public String explainRestriction(Resource type) {
+        StringBuilder sb = new StringBuilder();
+        if (classes.containsKey(type)) {
+            OwlClass pr = classes.get(type);
+            sb.append("owl:Restriction");
+            for (URI p : pr.getOnProperty()) {
+                sb.append(" (owl:onProperty ").append(p.toString()).append(")");
+            }
+            for (Value v : pr.hasValue()) {
+                sb.append(" (owl:hasValue ").append(v.toString()).append(")");
+            }
+            for (Resource c : pr.someValuesFrom()) {
+                sb.append(" (owl:someValuesFrom ").append(c.toString()).append(")");
+            }
+            for (Resource c : pr.allValuesFrom()) {
+                sb.append(" (owl:allValuesFrom ").append(c.toString()).append(")");
+            }
+            int mc = pr.getMaxCardinality();
+            int mqc = pr.getMaxQualifiedCardinality();
+            if (mc >= 0) {
+                sb.append(" (owl:maxCardinality ").append(mc).append(")");
+            }
+            if (mqc >= 0) {
+                sb.append(" (owl:maxQualifiedCardinality ").append(mqc);
+            }
+            for (Resource c : pr.onClass()) {
+                sb.append(" owl:onClass ").append(c.toString()).append(")");
+            }
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java
new file mode 100644
index 0000000..1d1ac8f
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java
@@ -0,0 +1,234 @@
+package mvm.rya.reasoning;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDF;
+
+/**
+ * Keep track of a single node's types and do reasoning about its types.
+ */
+public class TypeReasoner extends AbstractReasoner {
+    // This node's types, whether asserted or derived
+    Map<Resource, Fact> knownTypes = new HashMap<>();
+
+    // Inferences waiting for particular types to be discovered
+    Map<Resource, List<Fact>> possibleInferences = new HashMap<>();
+    Map<Resource, List<Derivation>> possibleInconsistencies = new HashMap<>();
+
+    /**
+     * Constructor.
+     * @param   node    Conduct reasoning about/around this node
+     * @param   schema  Global schema (class/property) information
+     * @param   t       Iteration # of new triples
+     * @param   tSchema Iteration # of latest schema change
+     */
+    public TypeReasoner(Resource node, Schema schema, int t, int tSchema) {
+        super(node, schema, t, tSchema);
+    }
+
+    /**
+     * Process a type (class) assignment from the input. It may have been
+     * inferred during a previous iteration.
+     * @param typeFact  An assertion about one of this node's types
+     */
+    void processType(Fact typeFact) {
+        Resource type = (Resource) typeFact.getObject();
+        boolean newType = !knownTypes.containsKey(type);
+        int t = typeFact.getIteration();
+        // Save the type in memory unless older knowledge takes precedence
+        if (newType || t < knownTypes.get(type).getIteration()) {
+            knownTypes.put(type, typeFact);
+            // Perform further inference
+            typeInference(typeFact);
+        }
+    }
+
+    /**
+     * Produce and process a type derivation from this iteration.
+     * TODO: how to implement rules that would produce "literal rdf:type ?x"
+     * @param type      The type itself
+     * @param rule      The generating rule
+     * @param source    The source of the derivation
+     */
+    void processType(Resource type, OwlRule rule, Fact source) {
+        processType(triple(node, RDF.TYPE, type, rule, source));
+    }
+
+    /**
+     * Infer additional information from a type assertion.
+     */
+    void typeInference(Fact typeFact) {
+        Resource type = (Resource) typeFact.getObject();
+        OwlClass c = schema.getClass(type);
+        // RL rule cls-nothing2: Inconsistent if type owl:Nothing
+        if (OWL.NOTHING.equals(type) && frontier(typeFact)) {
+            // Skip if this isn't a new fact
+            collectInconsistency(inconsistency(OwlRule.CLS_NOTHING2, typeFact));
+        }
+        // RL rule cax-dw: type shouldn't be disjointWith a previous type
+        Set<Resource> disjoint = c.getDisjointClasses();
+        disjoint.retainAll(knownTypes.keySet());
+        for (Resource other : disjoint) {
+            Fact otherTypeFact = knownTypes.get(other);
+            Derivation inc = inconsistency(OwlRule.CAX_DW, typeFact);
+            inc.addSource(otherTypeFact);
+            collectInconsistency(inc);
+        }
+        // RL rule cls-com: type shouldn't be complementOf a previous type
+        Set<Resource> complementary = c.getComplementaryClasses();
+        complementary.retainAll(knownTypes.keySet());
+        for (Resource other : complementary) {
+            Fact otherTypeFact = knownTypes.get(other);
+            Derivation inc = inconsistency(OwlRule.CLS_COM, typeFact);
+            inc.addSource(otherTypeFact);
+            collectInconsistency(inc);
+        }
+        // RL rule cax-sco: subClassOf semantics (derive superclasses)
+        if (!typeFact.hasRule(OwlRule.CAX_SCO)
+            && frontier(typeFact)) {
+            // Skip if typeFact itself came from this rule, and/or if typeFact
+            // generally shouldn't be the sole source of new information
+            for (Resource supertype : c.getSuperClasses()) {
+                // If the supertype isn't trivial, assert it
+                if (!supertype.equals(type)
+                        && !(supertype.equals(OWL.THING))) {
+                    processType(supertype, OwlRule.CAX_SCO, typeFact);
+                }
+            }
+        }
+        // Apply property restriction rules:
+        for (URI prop : c.getOnProperty()) {
+            // RL rule cls-hv1: if type is an owl:hasValue restriction
+            for (Value val : c.hasValue()) {
+                collect(triple(node, prop, val, OwlRule.CLS_HV1, typeFact));
+            }
+        }
+        // Derive any facts whose explicit condition is this type assignment
+        if (possibleInferences.containsKey(type)) {
+            for (Fact fact : possibleInferences.get(type)) {
+                Fact join = fact.clone();
+                join.addSource(typeFact);
+                collect(join);
+            }
+        }
+        // Derive any inconsistencies whose explicit condition is this type
+        if (possibleInconsistencies.containsKey(type)) {
+            for (Derivation d : possibleInconsistencies.get(type)) {
+                Derivation inc = d.clone();
+                inc.addSource(typeFact);
+                collectInconsistency(inc);
+            }
+        }
+    }
+
+    /**
+     * Assert an arbitrary fact if and when this node is determined to have
+     * a particular type. Facilitates join rules specifically concerning type.
+     */
+    void onType(Resource type, Fact fact) {
+        if (!possibleInferences.containsKey(type)) {
+            possibleInferences.put(type, new LinkedList<Fact>());
+        }
+        possibleInferences.get(type).add(fact);
+        // If we already know the type, assert the fact right away.
+        if (knownTypes.containsKey(type)) {
+            Fact join = fact.clone();
+            join.addSource(knownTypes.get(type));
+            collect(join);
+        }
+    }
+
+    /**
+     * Assert an inconsistency if and when this node is determined to have
+     * a particular type. Facilitates join rules specifically concerning type.
+     */
+    void inconsistentOnType(Resource type, Derivation fact) {
+        if (!possibleInconsistencies.containsKey(type)) {
+            possibleInconsistencies.put(type, new LinkedList<Derivation>());
+        }
+        possibleInconsistencies.get(type).add(fact);
+        // If we already know the type, assert the fact right away.
+        if (knownTypes.containsKey(type)) {
+            Derivation d = fact.clone();
+            d.addSource(knownTypes.get(type));
+            collectInconsistency(d);
+        }
+    }
+
+    /**
+     * Collect all the type knowledge into the output, if it represents new
+     * information.
+     */
+    void collectTypes() {
+        for (Resource type : knownTypes.keySet()) {
+            collect(knownTypes.get(type));
+        }
+    }
+
+    /**
+     * Get info about types derived and potential inferences.
+     */
+    @Override
+    public String getDiagnostics() {
+        int total = 0;
+        int incTotal = 0;
+        for (Resource uri : possibleInferences.keySet()) {
+            total += possibleInferences.get(uri).size();
+        }
+        for (Resource uri : possibleInconsistencies.keySet()) {
+            incTotal += possibleInconsistencies.get(uri).size();
+        }
+        StringBuilder sb = new StringBuilder();
+        sb.append(knownTypes.size()).append(" total types known\n");
+        sb.append("Watching for ").append(possibleInferences.size());
+        sb.append(" distinct types to trigger any of ").append(total);
+        sb.append(" possible inferences");
+        sb.append("Watching for ").append(possibleInconsistencies.size());
+        sb.append(" distinct types to trigger any of ").append(incTotal);
+        sb.append(" possible inconsistencies");
+        return sb.toString();
+    }
+
+    /**
+     * Get the total number of input facts cached.
+     */
+    @Override
+    public int getNumStored() {
+        int total = knownTypes.size();
+        for (List<Fact> l : possibleInferences.values()) {
+            total += l.size();
+        }
+        for (List<Derivation> l : possibleInconsistencies.values()) {
+            total += l.size();
+        }
+        return total;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java
new file mode 100644
index 0000000..dde83c6
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java
@@ -0,0 +1,492 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.accumulo.mr.fileinput.RdfFileInputFormat;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.reasoning.Derivation;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.input.CombineSequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.openrdf.rio.RDFFormat;
+
+/**
+ * Contains common functionality for MapReduce jobs involved in reasoning. A
+ * subclass should implement configureReasoningJob and its own mappers and
+ * reducers.
+ */
+abstract public class AbstractReasoningTool extends Configured implements Tool {
+    // Keep track of statistics about the input
+    protected static enum COUNTERS { ABOX, TBOX, USEFUL };
+
+    // MapReduce job, to be configured by subclasses
+    protected Job job;
+
+    /**
+     * Configure the job's inputs, outputs, mappers, and reducers.
+     */
+    abstract protected void configureReasoningJob(String[] args) throws Exception;
+
+    /**
+     * Configure and run a MapReduce job.
+     */
+    @Override
+    public int run(String[] args) throws Exception {
+        Configuration conf = getConf();
+        job = Job.getInstance(conf);
+        job.setJobName(getJobName());
+        job.setJarByClass(this.getClass());
+        configureReasoningJob(args);
+        boolean success = job.waitForCompletion(!MRReasoningUtils.stats(conf));
+        if (success) {
+            return 0;
+        }
+        else {
+            return 1;
+        }
+    }
+
+    /**
+     * Cumulative CPU time taken by all mappers/reducers.
+     */
+    public long getCumulativeTime() throws IOException {
+        return getCounter(TaskCounter.CPU_MILLISECONDS);
+    }
+
+    /**
+     * Default name for the MapReduce job:
+     */
+    protected String getJobName() {
+        return "Rya reasoning, pass " + MRReasoningUtils.getCurrentIteration(getConf())
+            + ": " + this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
+    }
+
+    /**
+     * Number of inconsistencies detected by this job.
+     */
+    public long getNumInconsistencies() throws IOException {
+        return getCounter(MultipleOutputs.class.getName(),
+            MRReasoningUtils.INCONSISTENT_OUT);
+    }
+
+    /**
+     * Number of new schema triples derived during this job.
+     */
+    public long getNumSchemaTriples() throws IOException {
+        return getCounter(MultipleOutputs.class.getName(),
+            MRReasoningUtils.SCHEMA_OUT);
+    }
+
+    /**
+     * Number of new instance triples that might be used for future reasoning
+     */
+    public long getNumUsefulOutput() throws IOException {
+        return getCounter(MultipleOutputs.class.getName(),
+            MRReasoningUtils.INTERMEDIATE_OUT);
+    }
+
+    /**
+     * Number of new instance triples that will not be used for future reasoning
+     */
+    public long getNumTerminalOutput() throws IOException {
+        return getCounter(MultipleOutputs.class.getName(),
+            MRReasoningUtils.TERMINAL_OUT);
+    }
+
+    /**
+     * Total number of new instance triples derived during this job.
+     */
+    public long getNumInstanceTriples() throws IOException {
+        return getNumUsefulOutput() + getNumTerminalOutput();
+    }
+
+    /**
+     * Number of instance triples seen as input during this job.
+     */
+    public long getNumInstanceInput() throws IOException {
+        return getCounter(COUNTERS.ABOX);
+    }
+
+    /**
+     * Number of schema triples seen as input during this job.
+     */
+    public long getNumSchemaInput() throws IOException {
+        return getCounter(COUNTERS.TBOX);
+    }
+
+    /**
+     * Increment the schema or instance triple counter, as appropriate.
+     */
+    protected static void countInput(boolean schema, TaskAttemptContext context) {
+        if (schema) {
+            context.getCounter(COUNTERS.TBOX).increment(1);
+        }
+        else {
+            context.getCounter(COUNTERS.ABOX).increment(1);
+        }
+    }
+
+    /**
+     * Add the schema file (TBox) to the distributed cache for the current job.
+     */
+    protected void distributeSchema() {
+        Path schemaPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
+        job.addCacheFile(schemaPath.toUri());
+    }
+
+    /**
+     * Set up the MapReduce job to use as inputs both an Accumulo table and the
+     * files containing previously derived information, excluding
+     * inconsistencies.  Looks for a file for every iteration number so far,
+     * preferring final cleaned up output from that iteration but falling back
+     * on intermediate data if necessary.
+     * @param tableMapper   Mapper class to use for database input
+     * @param rdfMapper     Mapper class to use for direct RDF input
+     * @param fileMapper    Mapper class to use for derived triples input
+     * @param filter        True to exclude previously derived data that couldn't be
+     *                      used to derive anything new at this point.
+     */
+    protected void configureMultipleInput(
+            Class<? extends Mapper<Key, Value, ?, ?>> tableMapper,
+            Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper,
+            Class<? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
+            boolean filter) throws IOException, AccumuloSecurityException {
+        Path inputPath = MRReasoningUtils.getInputPath(job.getConfiguration());
+        if (inputPath != null) {
+            configureRdfInput(inputPath, rdfMapper);
+        }
+        else {
+            configureAccumuloInput(tableMapper);
+        }
+        configureFileInput(fileMapper, filter);
+    }
+
+    /**
+     * Set up the MapReduce job to use as inputs both an Accumulo table and the
+     * files containing previously derived information. Looks for a file for
+     * every iteration number so far, preferring final cleaned up output from
+     * that iteration but falling back on intermediate data if necessary.
+     * @param tableMapper   Mapper class to use for database input
+     * @param rdfMapper     Mapper class to use for direct RDF input
+     * @param fileMapper    Mapper class to use for derived triples input
+     * @param incMapper     Mapper class to use for derived inconsistencies input
+     * @param filter        True to exclude previously derived data that couldn't be
+     *                      used to derive anything new at this point.
+     */
+    protected void configureMultipleInput(
+            Class<? extends Mapper<Key, Value, ?, ?>> tableMapper,
+            Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper,
+            Class<? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
+            Class<? extends Mapper<Derivation, NullWritable, ?, ?>> incMapper,
+            boolean filter)
+            throws IOException, AccumuloSecurityException {
+        Path inputPath = MRReasoningUtils.getInputPath(job.getConfiguration());
+        if (inputPath != null) {
+            configureRdfInput(inputPath, rdfMapper);
+        }
+        else {
+            configureAccumuloInput(tableMapper);
+        }
+        configureFileInput(fileMapper, incMapper, filter);
+    }
+
+    /**
+     * Set up the MapReduce job to use file inputs from previous iterations,
+     * excluding inconsistencies found.
+     * @param   fileMapper  Mapper class to use for generated triples
+     * @param   filter      Exclude facts that aren't helpful for inference
+     */
+    protected void configureFileInput(
+            Class <? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
+            final boolean filter) throws IOException {
+        configureFileInput(fileMapper, null, filter);
+    }
+
+    /**
+     * Set up the MapReduce job to use file inputs from previous iterations.
+     * @param   fileMapper  Mapper class for generated triples
+     * @param   incMapper   Mapper class for generated inconsistenies
+     * @param   filter      Exclude facts that aren't helpful for inference
+     */
+    protected void configureFileInput(
+            Class <? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
+            Class <? extends Mapper<Derivation, NullWritable, ?, ?>> incMapper,
+            final boolean filter) throws IOException {
+        // Set up file input for all iterations up to this one
+        Configuration conf = job.getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        Path inputPath;
+        int iteration = MRReasoningUtils.getCurrentIteration(conf);
+        // Set min/max split, if not already provided:
+        long blocksize = Long.parseLong(conf.get("dfs.blocksize"));
+        String minSplitProp = "mapreduce.input.fileinputformat.split.minsize";
+        String maxSplitProp = "mapreduce.input.fileinputformat.split.maxsize";
+        conf.set(minSplitProp, conf.get(minSplitProp, String.valueOf(blocksize)));
+        conf.set(maxSplitProp, conf.get(maxSplitProp, String.valueOf(blocksize*8)));
+        for (int i = 1; i <= iteration; i++) {
+            // Prefer cleaned output...
+            inputPath = MRReasoningUtils.getOutputPath(conf,
+                MRReasoningUtils.OUTPUT_BASE + i);
+            // But if there isn't any, try intermediate data:
+            if (!fs.isDirectory(inputPath)) {
+                inputPath = MRReasoningUtils.getOutputPath(conf,
+                    MRReasoningUtils.OUTPUT_BASE + i
+                    + MRReasoningUtils.TEMP_SUFFIX);
+            }
+            // And only proceed if we found one or the other.
+            if (fs.isDirectory(inputPath)) {
+                // Never include debug output. If filter is true, select only
+                // intermediate and schema data, otherwise include everything.
+                PathFilter f = new PathFilter() {
+                    public boolean accept(Path path) {
+                        String s = path.getName();
+                        if (s.startsWith(MRReasoningUtils.DEBUG_OUT)) {
+                            return false;
+                        }
+                        else {
+                            return !filter
+                                || s.startsWith(MRReasoningUtils.INTERMEDIATE_OUT)
+                                || s.startsWith(MRReasoningUtils.SCHEMA_OUT);
+                        }
+                    }
+                };
+                for (FileStatus status : fs.listStatus(inputPath, f)) {
+                    if (status.getLen() > 0) {
+                        Path p = status.getPath();
+                        String s = p.getName();
+                        if (s.startsWith(MRReasoningUtils.INCONSISTENT_OUT)) {
+                            if (incMapper != null) {
+                                MultipleInputs.addInputPath(job, p,
+                                    CombineSequenceFileInputFormat.class, incMapper);
+                            }
+                        }
+                        else {
+                            MultipleInputs.addInputPath(job, status.getPath(),
+                                CombineSequenceFileInputFormat.class, fileMapper);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Set up the MapReduce job to use Accumulo as an input.
+     * @param tableMapper Mapper class to use
+     */
+    protected void configureAccumuloInput(Class<? extends Mapper<Key,Value,?,?>> tableMapper)
+            throws AccumuloSecurityException {
+        MRReasoningUtils.configureAccumuloInput(job);
+        MultipleInputs.addInputPath(job, new Path("/tmp/input"),
+            AccumuloInputFormat.class, tableMapper);
+    }
+
+    /**
+     * Set up the MapReduce job to use an RDF file as an input.
+     * @param rdfMapper class to use
+     */
+    protected void configureRdfInput(Path inputPath,
+            Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper) {
+        Configuration conf = job.getConfiguration();
+        String format = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName());
+        conf.set(MRUtils.FORMAT_PROP, format);
+        MultipleInputs.addInputPath(job, inputPath,
+            RdfFileInputFormat.class, rdfMapper);
+    }
+
+    /**
+     * Set up the MapReduce job to output a schema (TBox).
+     */
+    protected void configureSchemaOutput() {
+        Path outPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
+        SequenceFileOutputFormat.setOutputPath(job, outPath);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(SchemaWritable.class);
+        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+        MultipleOutputs.addNamedOutput(job, "schemaobj",
+            SequenceFileOutputFormat.class, NullWritable.class, SchemaWritable.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
+            TextOutputFormat.class, Text.class, Text.class);
+        MultipleOutputs.setCountersEnabled(job, true);
+    }
+
+    /**
+     * Set up the MapReduce job to output newly derived triples. Outputs to
+     * directory [base]-[iteration].
+     */
+    protected void configureDerivationOutput() {
+        configureDerivationOutput(false);
+    }
+
+    /**
+     * Set up a MapReduce job to output newly derived triples.
+     * @param   intermediate    True if this is intermediate data. Outputs
+     *                          to [base]-[iteration]-[temp].
+     */
+    protected void configureDerivationOutput(boolean intermediate) {
+        Path outPath;
+        Configuration conf = job.getConfiguration();
+        int iteration = MRReasoningUtils.getCurrentIteration(conf);
+        if (intermediate) {
+            outPath = MRReasoningUtils.getOutputPath(conf,
+                MRReasoningUtils.OUTPUT_BASE + iteration
+                + MRReasoningUtils.TEMP_SUFFIX);
+        }
+        else {
+            outPath = MRReasoningUtils.getOutputPath(conf,
+                MRReasoningUtils.OUTPUT_BASE + iteration);
+        }
+        SequenceFileOutputFormat.setOutputPath(job, outPath);
+        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
+            SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
+            SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
+            SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
+            SequenceFileOutputFormat.class, Derivation.class, NullWritable.class);
+        MultipleOutputs.setCountersEnabled(job, true);
+        // Set up an output for diagnostic info, if needed
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
+            TextOutputFormat.class, Text.class, Text.class);
+    }
+
+    /**
+     * Set up a MapReduce job to output human-readable text.
+     */
+    protected void configureTextOutput(String destination) {
+        Path outPath;
+        outPath = MRReasoningUtils.getOutputPath(job.getConfiguration(), destination);
+        TextOutputFormat.setOutputPath(job, outPath);
+        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
+            TextOutputFormat.class, NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
+            TextOutputFormat.class, NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
+            TextOutputFormat.class, NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
+            TextOutputFormat.class, NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
+            TextOutputFormat.class, Text.class, Text.class);
+        MultipleOutputs.setCountersEnabled(job, true);
+    }
+
+    /**
+     * Get the name of the output to send an inconsistency to.
+     * @return  The name of the output file(s) to send inconsistencies to
+     */
+    protected static String getOutputName(Derivation inconsistency) {
+        return MRReasoningUtils.INCONSISTENT_OUT;
+    }
+
+    /**
+     * Get the name of the output to send a fact to.
+     * @param   fact    The fact itself
+     * @param   finalOut    True if this is for final output, not intermediate
+     * @return  The name of the output file(s) to send this fact to
+     */
+    protected static String getOutputName(Fact fact, boolean finalOut) {
+        if (Schema.isSchemaTriple(fact.getTriple())) {
+            return MRReasoningUtils.SCHEMA_OUT;
+        }
+        else if (!finalOut && fact.isUseful()) {
+            return MRReasoningUtils.INTERMEDIATE_OUT;
+        }
+        else {
+            return MRReasoningUtils.TERMINAL_OUT;
+        }
+    }
+
+    /**
+     * Get the name of the output to send a fact to.
+     */
+    protected static String getOutputName(Fact fact) {
+        return getOutputName(fact, false);
+    }
+
+    /**
+     * Retrieve an arbitrary counter's value.
+     * @param   group Counter's group name
+     * @param   counter Name of the counter itself
+     */
+    public long getCounter(String group, String counter) throws IOException {
+        return job.getCounters().findCounter(group, counter).getValue();
+    }
+
+    /**
+     * Retrieve an arbitrary counter's value.
+     * @param   key     The Enum tied to this counter
+     */
+    public long getCounter(Enum<?> key) throws IOException {
+        return job.getCounters().findCounter(key).getValue();
+    }
+
+    /**
+     * Get the current iteration according to this job's configuration.
+     */
+    public int getIteration() {
+        return MRReasoningUtils.getCurrentIteration(getConf());
+    }
+
+    /**
+     * Get the job's JobID.
+     */
+    public JobID getJobID() {
+        return job.getJobID();
+    }
+
+    /**
+     * Get the elapsed wall-clock time, assuming the job is done.
+     */
+    public long getElapsedTime() throws IOException, InterruptedException {
+        return job.getFinishTime() - job.getStartTime();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java
new file mode 100644
index 0000000..02cce66
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java
@@ -0,0 +1,436 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.OpenRDFException;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.helpers.RDFHandlerBase;
+import org.openrdf.rio.ntriples.NTriplesParser;
+import org.openrdf.rio.rdfxml.RDFXMLParser;
+import org.openrdf.sail.memory.MemoryStore;
+
+/**
+ * Test the reasoner against Owl conformance tests in the database.
+ */
+public class ConformanceTest extends Configured implements Tool {
+    static String TYPE = RDF.TYPE.stringValue();
+    static String TEST = "http://www.w3.org/2007/OWL/testOntology#";
+    static String TEST_CONSISTENCY = TEST + "ConsistencyTest";
+    static String TEST_INCONSISTENCY = TEST + "InconsistencyTest";
+    static String TEST_ENTAILMENT = TEST + "PositiveEntailmentTest";
+    static String TEST_NONENTAILMENT = TEST + "NegativeEntailmentTest";
+    static String TEST_ID = TEST + "identifier";
+    static String TEST_DESC = TEST + "description";
+    static String TEST_PROFILE = TEST + "profile";
+    static String TEST_PREMISE = TEST + "rdfXmlPremiseOntology";
+    static String TEST_CONCLUSION = TEST + "rdfXmlConclusionOntology";
+    static String TEST_NONCONCLUSION = TEST + "rdfXmlNonConclusionOntology";
+    static String TEST_RL = TEST + "RL";
+    static String TEST_SEMANTICS = TEST + "semantics";
+    static String TEST_RDFBASED = TEST + "RDF-BASED";
+
+    private static class OwlTest extends RDFHandlerBase {
+        Value uri;
+        String name;
+        String description;
+        String premise;
+        String compareTo;
+        Set<String> types = new HashSet<>();
+        boolean success;
+        Set<Statement> expected = new HashSet<>();
+        Set<Statement> unexpected = new HashSet<>();
+        Set<Statement> inferred = new HashSet<>();
+        Set<Statement> error = new HashSet<>();
+        @Override
+        public void handleStatement(Statement st) {
+            if (types.contains(TEST_ENTAILMENT)) {
+                expected.add(st);
+            }
+            else if (types.contains(TEST_NONENTAILMENT)) {
+                unexpected.add(st);
+            }
+        }
+        String type() {
+            StringBuilder sb = new StringBuilder();
+            if (types.contains(TEST_CONSISTENCY)) {
+                sb.append("{Consistency}");
+            }
+            if (types.contains(TEST_INCONSISTENCY)) {
+                sb.append("{Inconsistency}");
+            }
+            if (types.contains(TEST_ENTAILMENT)) {
+                sb.append("{Entailment}");
+            }
+            if (types.contains(TEST_NONENTAILMENT)) {
+                sb.append("{Nonentailment}");
+            }
+            return sb.toString();
+        }
+    }
+
+    private static class OutputCollector extends RDFHandlerBase {
+        Set<Statement> triples = new HashSet<>();
+        @Override
+        public void handleStatement(Statement st) {
+            triples.add(st);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        ToolRunner.run(new ConformanceTest(), args);
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        // Validate command
+        if (args.length < 1 || args.length > 2) {
+            System.out.println("Usage:\n");
+            System.out.println("\tConformanceTest [configuration options] "
+                + "<test-file> <temp-dir>\n");
+            System.out.println("to load test data from an RDF file "
+                + "(configuration property " + MRUtils.FORMAT_PROP
+                + " specifies the format, default RDF/XML); or\n");
+            System.out.println("\tConformanceTest [configuration options] <temp-dir>\n");
+            System.out.println("to load test data from a Rya instance (specified "
+                + "using standard configuration properties).\n");
+            System.out.println("For each test given, run the reasoner over the "
+                + "premise ontology using a temporary Mini Accumulo instance "
+                + "at <temp-dir>, then report conformance results.");
+            System.exit(1);
+        }
+
+        Set<Value> conformanceTestURIs = new HashSet<>();
+        Collection<OwlTest> conformanceTests = new LinkedList<>();
+        List<OwlTest> successes = new LinkedList<>();
+        List<OwlTest> failures = new LinkedList<>();
+        Configuration conf = getConf();
+        Repository repo;
+        File workingDir;
+
+        // If tests are in a file, stick them in a repository for querying
+        if (args.length == 2) {
+            workingDir = new File(args[1]);
+            RDFFormat inputFormat= RDFFormat.RDFXML;
+            String formatString = conf.get(MRUtils.FORMAT_PROP);
+            if (formatString != null) {
+                inputFormat = RDFFormat.valueOf(formatString);
+            }
+            repo = new SailRepository(new MemoryStore());
+            repo.initialize();
+            RepositoryConnection conn = repo.getConnection();
+            conn.add(new FileInputStream(args[0]), "", inputFormat);
+            conn.close();
+        }
+        // Otherwise, get a Rya repository
+        else {
+            workingDir = new File(args[0]);
+            repo = MRReasoningUtils.getRepository(conf);
+            repo.initialize();
+        }
+
+        // Query for the tests we're interested in
+        RepositoryConnection conn = repo.getConnection();
+        conformanceTestURIs.addAll(getTestURIs(conn, TEST_INCONSISTENCY));
+        conformanceTestURIs.addAll(getTestURIs(conn, TEST_CONSISTENCY));
+        conformanceTestURIs.addAll(getTestURIs(conn, TEST_ENTAILMENT));
+        conformanceTestURIs.addAll(getTestURIs(conn, TEST_NONENTAILMENT));
+        conformanceTests = getTests(conn, conformanceTestURIs);
+        conn.close();
+        repo.shutDown();
+
+        // Set up a MiniAccumulo cluster and set up conf to connect to it
+        String username = "root";
+        String password = "root";
+        MiniAccumuloCluster mini = new MiniAccumuloCluster(workingDir, password);
+        mini.start();
+        conf.set(MRUtils.AC_INSTANCE_PROP, mini.getInstanceName());
+        conf.set(MRUtils.AC_ZK_PROP, mini.getZooKeepers());
+        conf.set(MRUtils.AC_USERNAME_PROP, username);
+        conf.set(MRUtils.AC_PWD_PROP, password);
+        conf.setBoolean(MRUtils.AC_MOCK_PROP, false);
+        conf.set(MRUtils.TABLE_PREFIX_PROPERTY, "temp_");
+        // Run the conformance tests
+        int result;
+        for (OwlTest test : conformanceTests) {
+            System.out.println(test.uri);
+            result = runTest(conf, args, test);
+            if (result != 0) {
+                return result;
+            }
+            if (test.success) {
+                successes.add(test);
+                System.out.println("(SUCCESS)");
+            }
+            else {
+                failures.add(test);
+                System.out.println("(FAIL)");
+            }
+        }
+        mini.stop();
+
+        System.out.println("\n" + successes.size() + " successful tests:");
+        for (OwlTest test : successes) {
+            System.out.println("\t[SUCCESS] " + test.type() + " " + test.name);
+        }
+        System.out.println("\n" + failures.size() + " failed tests:");
+        for (OwlTest test : failures) {
+            System.out.println("\t[FAIL] " + test.type() + " " + test.name);
+            System.out.println("\t\t(" + test.description + ")");
+            for (Statement triple : test.error) {
+                if (test.types.contains(TEST_ENTAILMENT)) {
+                    System.out.println("\t\tExpected: " + triple);
+                }
+                else if (test.types.contains(TEST_NONENTAILMENT)) {
+                    System.out.println("\t\tUnexpected: " + triple);
+                }
+            }
+        }
+        return 0;
+    }
+
+    /**
+     * Verify that we can infer the correct triples or detect an inconsistency.
+     * @param   conf    Specifies working directory, etc.
+     * @param   OwlTest   Contains premise/conclusion graphs, will store result
+     * @return  Return value of the MapReduce job
+     */
+    int runTest(Configuration conf, String[] args, OwlTest test)
+            throws Exception {
+        conf.setInt(MRReasoningUtils.STEP_PROP, 0);
+        conf.setInt(MRReasoningUtils.SCHEMA_UPDATE_PROP, 0);
+        conf.setBoolean(MRReasoningUtils.DEBUG_FLAG, true);
+        conf.setBoolean(MRReasoningUtils.OUTPUT_FLAG, true);
+        // Connect to MiniAccumulo and load the test
+        Repository repo = MRReasoningUtils.getRepository(conf);
+        repo.initialize();
+        RepositoryConnection conn = repo.getConnection();
+        conn.clear();
+        conn.add(new StringReader(test.premise), "", RDFFormat.RDFXML);
+        conn.close();
+        repo.shutDown();
+        // Run the reasoner
+        ReasoningDriver reasoner = new ReasoningDriver();
+        int result = ToolRunner.run(conf, reasoner, args);
+        test.success = (result == 0);
+        // Inconsistency test: successful if determined inconsistent
+        if (test.types.contains(TEST_INCONSISTENCY)) {
+            test.success = test.success && reasoner.hasInconsistencies();
+        }
+        // Consistency test: successful if determined consistent
+        if (test.types.contains(TEST_CONSISTENCY)) {
+            test.success = test.success && !reasoner.hasInconsistencies();
+        }
+        // Other types: we'll need to look at the inferred triples/schema
+        if (test.types.contains(TEST_NONENTAILMENT)
+            || test.types.contains(TEST_ENTAILMENT))  {
+            System.out.println("Reading inferred triples...");
+            // Read in the inferred triples from HDFS:
+            Schema schema = MRReasoningUtils.loadSchema(conf);
+            FileSystem fs = FileSystem.get(conf);
+            Path path = MRReasoningUtils.getOutputPath(conf, "final");
+            OutputCollector inferred = new OutputCollector();
+            NTriplesParser parser = new NTriplesParser();
+            parser.setRDFHandler(inferred);
+            if (fs.isDirectory(path)) {
+                for (FileStatus status : fs.listStatus(path)) {
+                    String s = status.getPath().getName();
+                    if (s.startsWith(MRReasoningUtils.INCONSISTENT_OUT)
+                        || s.startsWith(MRReasoningUtils.DEBUG_OUT)) {
+                        continue;
+                    }
+                    BufferedReader br = new BufferedReader(
+                        new InputStreamReader(fs.open(status.getPath())));
+                    parser.parse(br, "");
+                    br.close();
+                }
+            }
+            MRReasoningUtils.deleteIfExists(conf, "final");
+            test.inferred.addAll(inferred.triples);
+            // Entailment test: successful if expected triples were inferred
+            if (test.types.contains(TEST_ENTAILMENT)) {
+                // Check expected inferences against the inferred triples and
+                // the schema reasoner
+                for (Statement st : test.expected) {
+                    Fact fact = new Fact(st);
+                    if (!test.inferred.contains(st)
+                            && !triviallyTrue(fact.getTriple(), schema)
+                            && !schema.containsTriple(fact.getTriple())) {
+                        test.error.add(st);
+                    }
+                }
+            }
+            // Non-entailment test: failure if non-expected triples inferred
+            if (test.types.contains(TEST_NONENTAILMENT)) {
+                for (Statement st : test.unexpected) {
+                    Fact fact = new Fact(st);
+                    if (test.inferred.contains(st)
+                        || schema.containsTriple(fact.getTriple())) {
+                        test.error.add(st);
+                    }
+                }
+            }
+            test.success = test.success && test.error.isEmpty();
+        }
+        conf.setBoolean(MRReasoningUtils.DEBUG_FLAG, false);
+        MRReasoningUtils.clean(conf);
+        return result;
+    }
+
+    /**
+     * Query a connection for conformance tests matching a particular
+     * test type.
+     */
+    Set<Value> getTestURIs(RepositoryConnection conn, String testType)
+            throws IOException, OpenRDFException {
+        Set<Value> testURIs = new HashSet<>();
+        TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL,
+            "select ?test where { " +
+            "?test <" + TYPE + "> <" + testType + "> .\n" +
+            "?test <" + TEST_PROFILE + "> <" + TEST_RL + "> .\n" +
+            "?test <" + TEST_SEMANTICS + "> <" + TEST_RDFBASED + "> .\n" +
+            "}");
+        TupleQueryResult queryResult = query.evaluate();
+        while (queryResult.hasNext()) {
+            BindingSet bindings = queryResult.next();
+            testURIs.add(bindings.getValue("test"));
+        }
+        queryResult.close();
+        return testURIs;
+    }
+
+    /**
+     * Query a connection for conformance test details.
+     */
+    Collection<OwlTest> getTests(RepositoryConnection conn, Set<Value> testURIs)
+            throws IOException, OpenRDFException {
+        Map<Value, OwlTest> tests = new HashMap<>();
+        TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL,
+            "select * where { " +
+            "?test <" + TYPE + "> ?testType .\n" +
+            "?test <" + TEST_PREMISE + "> ?graph .\n" +
+            "?test <" + TEST_ID + "> ?name .\n" +
+            "?test <" + TEST_DESC + "> ?description .\n" +
+            "?test <" + TEST_PROFILE + "> <" + TEST_RL + "> .\n" +
+            "?test <" + TEST_SEMANTICS + "> <" + TEST_RDFBASED + "> .\n" +
+            "OPTIONAL {?test <" + TEST_CONCLUSION + "> ?conclusion .}\n" +
+            "OPTIONAL {?test <" + TEST_NONCONCLUSION + "> ?nonentailed .}\n" +
+            "}");
+        TupleQueryResult queryResult = query.evaluate();
+        while (queryResult.hasNext()) {
+            BindingSet bindings = queryResult.next();
+            Value uri = bindings.getValue("test");
+            if (testURIs.contains(uri)) {
+                OwlTest test;
+                if (tests.containsKey(uri)) {
+                    test = tests.get(uri);
+                }
+                else {
+                    test = new OwlTest();
+                    test.uri = uri;
+                    test.name = bindings.getValue("name").stringValue();
+                    test.description = bindings.getValue("description").stringValue();
+                    test.premise = bindings.getValue("graph").stringValue();
+                    if (bindings.hasBinding("conclusion")) {
+                        test.compareTo = bindings.getValue("conclusion").stringValue();
+                    }
+                    if (bindings.hasBinding("nonentailed")) {
+                        test.compareTo = bindings.getValue("nonentailed").stringValue();
+                    }
+                    tests.put(uri, test);
+                }
+                test.types.add(bindings.getValue("testType").stringValue());
+            }
+        }
+        for (OwlTest test : tests.values()) {
+            if (test.compareTo != null) {
+                RDFXMLParser parser = new RDFXMLParser();
+                parser.setRDFHandler(test);
+                parser.parse(new StringReader(test.compareTo), "");
+            }
+        }
+        queryResult.close();
+        return tests.values();
+    }
+
+    /**
+     * Determine that a statement is trivially true for purposes of entailment
+     * tests, such as an implicit "[bnode] type Ontology" triple or a
+     * "[class] type Class" triple as long as the class exists.
+     */
+    boolean triviallyTrue(Statement triple, Schema schema) {
+        Resource s = triple.getSubject();
+        URI p = triple.getPredicate();
+        Value o = triple.getObject();
+        if (p.equals(RDF.TYPE)) {
+            if (o.equals(OWL.ONTOLOGY)) {
+                return true;
+            }
+            else if (o.equals(OWL.CLASS)) {
+                return schema.hasClass(s);
+            }
+            else if ((o.equals(OWL.OBJECTPROPERTY)
+                || o.equals(OWL.DATATYPEPROPERTY))
+                && s instanceof URI) {
+                // Distinction not maintained, irrelevant to RL rules
+                return schema.hasProperty((URI) s);
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java
new file mode 100644
index 0000000..3e54367
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java
@@ -0,0 +1,225 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.reasoning.Derivation;
+import mvm.rya.reasoning.Fact;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+public class DuplicateElimination extends AbstractReasoningTool {
+    public static void main(String[] args) throws Exception {
+        System.exit(ToolRunner.run(new DuplicateElimination(), args));
+    }
+
+    @Override
+    protected void configureReasoningJob(String[] args) throws Exception {
+        configureMultipleInput(DuplicateTableMapper.class,
+            DuplicateRdfMapper.class, DuplicateFileMapper.class,
+            InconsistencyMapper.class, false);
+        job.setMapOutputKeyClass(Fact.class);
+        job.setMapOutputValueClass(Derivation.class);
+        job.setReducerClass(DuplicateEliminationReducer.class);
+        configureDerivationOutput();
+    }
+
+    public static class DuplicateEliminationMapper<K, V> extends Mapper<K, V,
+            Fact, Derivation> {
+        private MultipleOutputs<?, ?> debugOut;
+        private boolean debug;
+        private Text debugK = new Text();
+        private Text debugV = new Text();
+        private Fact emptyFact = new Fact();
+        @Override
+        public void setup(Context context) {
+            Configuration conf = context.getConfiguration();
+            debug = MRReasoningUtils.debug(conf);
+            if (debug) {
+                debugOut = new MultipleOutputs<>(context);
+            }
+        }
+        @Override
+        public void cleanup(Context context) throws IOException,
+                InterruptedException {
+            if (debugOut != null) {
+                debugOut.close();
+            }
+        }
+        protected void process(Context context, Fact fact, Derivation d,
+                String source) throws IOException, InterruptedException {
+            context.write(fact, d);
+        }
+
+        protected void process(Context context, Fact fact,
+                String source) throws IOException, InterruptedException {
+            if (debug) {
+                debugK.set("MAP_" + source + ": " + fact.explain(false));
+                debugV.set("iteration=" + fact.getIteration()
+                    + ", size=" + fact.span());
+                debugOut.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+            }
+            Derivation d = fact.unsetDerivation();
+            process(context, fact, d, source);
+        }
+
+        protected void process(Context context, Derivation d,
+                String source) throws IOException, InterruptedException {
+            if (debug) {
+                debugK.set("MAP_" + source + ": inconsistency : "
+                    + d.explain(false));
+                debugV.set("iteration=" + d.getIteration()
+                    + ", size=" + d.span());
+                debugOut.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+            }
+            emptyFact.setDerivation(d);
+            process(context, emptyFact, d, source);
+        }
+    }
+
+    public static class DuplicateTableMapper extends DuplicateEliminationMapper<
+            Key, Value> {
+        private Fact inputTriple = new Fact();
+        @Override
+        public void map(Key row, Value data, Context context)
+                throws IOException, InterruptedException {
+            inputTriple.setTriple(MRReasoningUtils.getStatement(row, data,
+                context.getConfiguration()));
+            process(context, inputTriple, "TABLE");
+        }
+    }
+
+    public static class DuplicateFileMapper extends DuplicateEliminationMapper<
+            Fact, NullWritable> {
+        @Override
+        public void map(Fact key, NullWritable value, Context context)
+                throws IOException, InterruptedException {
+            process(context, key, "STEP-" + key.getIteration());
+        }
+    }
+
+    public static class DuplicateRdfMapper extends DuplicateEliminationMapper<
+            LongWritable, RyaStatementWritable> {
+        private Fact inputTriple = new Fact();
+        @Override
+        public void map(LongWritable key, RyaStatementWritable value,
+                Context context) throws IOException, InterruptedException {
+            inputTriple.setTriple(value.getRyaStatement());
+            process(context, inputTriple, "RDF");
+        }
+    }
+
+    public static class InconsistencyMapper extends DuplicateEliminationMapper<
+            Derivation, NullWritable> {
+        @Override
+        public void map(Derivation key, NullWritable value, Context context)
+                throws IOException, InterruptedException {
+            process(context, key, "INCONSISTENCY-STEP-" + key.getIteration());
+        }
+    }
+
+    public static class DuplicateEliminationReducer extends Reducer<
+            Fact, Derivation, Fact, NullWritable> {
+        protected MultipleOutputs<?, ?> mout;
+        protected int current;
+        protected boolean debug;
+        protected Logger log = Logger.getLogger(DuplicateEliminationReducer.class);
+        protected long totalInput = 0;
+        protected long totalFacts = 0;
+        protected long totalOutput = 0;
+        @Override
+        public void setup(Context context) {
+            Configuration conf = context.getConfiguration();
+            mout = new MultipleOutputs<>(context);
+            current = MRReasoningUtils.getCurrentIteration(conf);
+            debug = MRReasoningUtils.debug(conf);
+        }
+        @Override
+        public void cleanup(Context context) throws IOException,
+                InterruptedException {
+            mout.close();
+            log.info("Input records processed: " + totalInput);
+            log.info("Distinct facts: " + totalFacts);
+            log.info("Output facts: " + totalOutput);
+        }
+        @Override
+        public void reduce(Fact fact, Iterable<Derivation> derivations,
+                Context context) throws IOException, InterruptedException {
+            log.debug(fact.toString() + ":");
+            totalFacts++;
+            // We only need to output this fact if it hasn't been derived
+            // before this step (and wasn't in the original data, marked
+            // as iteration 0). If we do want to output it, prefer the simplest
+            // derivation.
+            Derivation best = null;
+            boolean newFact = true;
+            int count = 0;
+            for (Derivation derivation : derivations) {
+                count++;
+                if (newFact) {
+                    if (derivation.getIteration() >= current) {
+                        // Valid so far; check if this is the best derivation:
+                        if (best == null || best.span() > derivation.span()) {
+                            best = derivation.clone();
+                        }
+                    }
+                    else if (debug) {
+                        newFact = false;
+                    }
+                    else {
+                        return;
+                    }
+                }
+                if (debug) {
+                    mout.write(MRReasoningUtils.DEBUG_OUT,
+                        new Text("DE " + fact.toString() + derivation.explain(false)),
+                        new Text(Integer.toString(count) + "\t" + newFact));
+                }
+            }
+            totalInput += count;
+            if (newFact) {
+                totalOutput++;
+                if (fact.isEmpty()) {
+                    // If there's no triple, it must be an inconsistency
+                    mout.write(getOutputName(best), best, NullWritable.get());
+                }
+                else {
+                    // Output a triple
+                    fact.setDerivation(best);
+                    mout.write(getOutputName(fact), fact, NullWritable.get());
+                }
+            }
+            log.debug(totalFacts + " facts, " + totalInput + " input records, "
+                + totalOutput + " output records");
+        }
+    }
+}