You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/06/21 17:11:42 UTC
[3/5] incubator-rya git commit: Included project rya.reasoning.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java
new file mode 100644
index 0000000..510fc75
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java
@@ -0,0 +1,538 @@
+package mvm.rya.reasoning;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.model.vocabulary.OWL;
+
+/**
+ * Hold on to facts about the schema (TBox/RBox) and perform what reasoning we
+ * can without instance data.
+ * <p>
+ * The Schema object, together with the OwlClass and OwlProperty objects it
+ * keeps track of, is responsible for schema reasoning, or the "Semantics of
+ * Schema Vocabulary" rules from the OWL RL/RDF specificiation. Some rules are
+ * handled dynamically, while the rest must be computed by calling closure()
+ * once the schema data has been read in.
+ * <p>
+ * Schema rules implemented in {@link OwlClass}:
+ * scm-cls, scm-eqc1, scm-eqc2, scm-sco,
+ * scm-hv, scm-svf2, scm-avf2
+ * <p>
+ * Schema rules implemented in {@link OwlProperty}:
+ * scm-op, scm-dp, scm-eqp1, scm-eqp2, scm-spo, scm-dom1, scm-dom2,
+ * scm-rng1, scm-rng2, scm-svf1, scm-avf1
+ * <p>
+ * TODO: scm-cls officially states owl:Nothing is a subclass of every class.
+ * Do we need to explicitly do something with this fact?
+ */
+public class Schema {
+ // Statements using these predicates are automatically relevant schema
+ // information.
+ private static final Set<URI> schemaPredicates = new HashSet<>();
+ private static final URI[] schemaPredicateURIs = {
+ RDFS.SUBCLASSOF,
+ RDFS.SUBPROPERTYOF,
+ RDFS.DOMAIN,
+ RDFS.RANGE,
+ OWL.EQUIVALENTCLASS,
+ OWL.EQUIVALENTPROPERTY,
+ OWL.INVERSEOF,
+ OWL.DISJOINTWITH,
+ OWL.COMPLEMENTOF,
+ OWL.ONPROPERTY,
+ OWL.SOMEVALUESFROM,
+ OWL.ALLVALUESFROM,
+ OWL.HASVALUE,
+ OWL.MAXCARDINALITY,
+ OWL2.MAXQUALIFIEDCARDINALITY,
+ OWL2.PROPERTYDISJOINTWITH,
+ OWL2.ONCLASS
+ };
+
+ // The fact that something is one of these types is schema information.
+ private static final Set<Resource> schemaTypes = new HashSet<>();
+ private static final Resource[] schemaTypeURIs = {
+ OWL.TRANSITIVEPROPERTY,
+ OWL2.IRREFLEXIVEPROPERTY,
+ OWL.SYMMETRICPROPERTY,
+ OWL2.ASYMMETRICPROPERTY,
+ OWL.FUNCTIONALPROPERTY,
+ OWL.INVERSEFUNCTIONALPROPERTY
+ };
+
+ static {
+ for (URI uri : schemaPredicateURIs) {
+ schemaPredicates.add(uri);
+ }
+ for (Resource uri : schemaTypeURIs) {
+ schemaTypes.add(uri);
+ }
+ }
+
+ /**
+ * Does this triple/statement encode potentially relevant schema
+ * information?
+ */
+ public static boolean isSchemaTriple(Statement triple) {
+ URI pred = triple.getPredicate();
+ // Triples with certain predicates are schema triples,
+ if (schemaPredicates.contains(pred)) {
+ return true;
+ }
+ // And certain type assertions are schema triples.
+ else if (pred.equals(RDF.TYPE)) {
+ if (schemaTypes.contains(triple.getObject())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Map URIs to schema information about a property
+ */
+ protected Map<URI, OwlProperty> properties = new HashMap<>();
+
+ /**
+ * Map Resources to schema information about a class/restriction
+ */
+ protected Map<Resource, OwlClass> classes = new HashMap<>();
+
+ /**
+ * Get schema information for a class, for reading and writing.
+ * Instantiates OwlClass if it doesn't yet exist.
+ */
+ public OwlClass getClass(Resource c) {
+ if (!classes.containsKey(c)) {
+ classes.put(c, new OwlClass(c));
+ }
+ return classes.get(c);
+ }
+
+ /**
+ * Get schema information for a class, for reading and writing.
+ * Assumes this Value refers to a class Resource.
+ */
+ public OwlClass getClass(Value c) {
+ return getClass((Resource) c);
+ }
+
+ /**
+ * Get schema information for a property, for reading and writing.
+ * Instantiates OwlProperty if it doesn't yet exist.
+ */
+ public OwlProperty getProperty(URI p) {
+ if (!properties.containsKey(p)) {
+ properties.put(p, new OwlProperty(p));
+ }
+ return properties.get(p);
+ }
+
+ /**
+ * Get schema information for a property, for reading and writing.
+ * Assumes this Value refers to a property URI.
+ */
+ public OwlProperty getProperty(Value p) {
+ return getProperty((URI) p);
+ }
+
+ /**
+ * Return whether this resource corresponds to a property.
+ */
+ public boolean hasProperty(URI r) {
+ return properties.containsKey(r);
+ }
+
+ /**
+ * Return whether this resource corresponds to a class.
+ */
+ public boolean hasClass(Resource r) {
+ return classes.containsKey(r);
+ }
+
+ /**
+ * Return whether this resource corresponds to a property restriction.
+ */
+ public boolean hasRestriction(Resource r) {
+ return classes.containsKey(r) && !classes.get(r).getOnProperty().isEmpty();
+ }
+
+ public Schema() {
+ }
+
+ /**
+ * Incorporate a new triple into the schema.
+ */
+ public void processTriple(Statement triple) {
+ Resource s = triple.getSubject();
+ URI p = triple.getPredicate();
+ Value o = triple.getObject();
+ if (isSchemaTriple(triple)) {
+ // For a type statement to be schema information, it must yield
+ // some boolean information about a property.
+ if (p.equals(RDF.TYPE)) {
+ if (schemaTypes.contains(o)) {
+ addPropertyType((URI) s, (Resource) o);
+ }
+ }
+
+ // Domain/range
+ else if (p.equals(RDFS.DOMAIN)) {
+ // Don't add trivial domain owl:Thing
+ if (!o.equals(OWL.THING)) {
+ getProperty(s).addDomain(getClass(o));
+ }
+ }
+ else if (p.equals(RDFS.RANGE)) {
+ // Don't add trivial range owl:Thing
+ if (!o.equals(OWL.THING)) {
+ getProperty(s).addRange(getClass(o));
+ }
+ }
+
+ // Sub/super relations
+ else if (p.equals(RDFS.SUBCLASSOF)) {
+ // Everything is a subclass of owl#Thing, we don't need to
+ // store that information
+ if (!o.equals(OWL.THING)) {
+ getClass(s).addSuperClass(getClass(o));
+ }
+ }
+ else if (p.equals(RDFS.SUBPROPERTYOF)) {
+ getProperty(s).addSuperProperty(getProperty(o));
+ }
+
+ // Equivalence relations
+ else if (p.equals(OWL.EQUIVALENTCLASS)) {
+ getClass(s).addEquivalentClass(getClass(o));
+ }
+ else if (p.equals(OWL.EQUIVALENTPROPERTY)) {
+ getProperty(s).addEquivalentProperty(getProperty(o));
+ }
+
+ // Inverse properties
+ else if (p.equals(OWL.INVERSEOF)) {
+ getProperty(s).addInverse(getProperty(o));
+ getProperty(o).addInverse(getProperty(s));
+ }
+
+ // Complementary classes
+ else if (p.equals(OWL.COMPLEMENTOF)) {
+ getClass(s).addComplement(getClass(o));
+ getClass(o).addComplement(getClass(s));
+ }
+
+ // Disjoint classes and properties
+ else if (p.equals(OWL.DISJOINTWITH)) {
+ getClass(s).addDisjoint(getClass(o));
+ getClass(o).addDisjoint(getClass(s));
+ }
+ else if (p.equals(OWL2.PROPERTYDISJOINTWITH)) {
+ getProperty(s).addDisjoint(getProperty(o));
+ getProperty(o).addDisjoint(getProperty(s));
+ }
+
+ // Property restriction info
+ else if (p.equals(OWL.ONPROPERTY)) {
+ getClass(s).addProperty(getProperty(o));
+ }
+ else if (p.equals(OWL.SOMEVALUESFROM)) {
+ getClass(s).addSvf(getClass(o));
+ }
+ else if (p.equals(OWL.ALLVALUESFROM)) {
+ getClass(s).addAvf(getClass(o));
+ }
+ else if (p.equals(OWL2.ONCLASS)) {
+ getClass(s).addClass(getClass(o));
+ }
+ else if (p.equals(OWL.HASVALUE)) {
+ getClass(s).addValue(o);
+ }
+ else if (p.equals(OWL.MAXCARDINALITY)) {
+ getClass(s).setMaxCardinality(o);
+ }
+ else if (p.equals(OWL2.MAXQUALIFIEDCARDINALITY)) {
+ getClass(s).setMaxQualifiedCardinality(o);
+ }
+ }
+ }
+
+ /**
+ * Add a particular characteristic to a property.
+ */
+ private void addPropertyType(URI p, Resource t) {
+ OwlProperty prop = getProperty(p);
+ if (t.equals(OWL.TRANSITIVEPROPERTY)) {
+ prop.setTransitive();
+ }
+ else if (t.equals(OWL.SYMMETRICPROPERTY)) {
+ prop.setSymmetric();
+ }
+ else if (t.equals(OWL2.ASYMMETRICPROPERTY)) {
+ prop.setAsymmetric();
+ }
+ else if (t.equals(OWL.FUNCTIONALPROPERTY)) {
+ prop.setFunctional();
+ }
+ else if (t.equals(OWL.INVERSEFUNCTIONALPROPERTY)) {
+ prop.setInverseFunctional();
+ }
+ else if (t.equals(OWL2.IRREFLEXIVEPROPERTY)) {
+ prop.setIrreflexive();
+ }
+ }
+
+ /**
+ * Perform schema-level reasoning to compute the closure of statements
+ * already represented in this schema. This includes things like subClassOf
+ * transitivity and applying domain/range to subclasses.
+ */
+ public void closure() {
+ // RL rule scm-spo: subPropertyOf transitivity
+ // (takes in subproperty info; yields subproperty info)
+ for (OwlProperty subprop : properties.values()) {
+ subprop.computeSuperProperties();
+ }
+
+ // RL rules scm-hv, scm-svf2, scm-avf2: restrictions & subproperties
+ // (take in subproperty info & prop. restrictions; yield subclass info)
+ for (OwlClass c1 : classes.values()) {
+ for (OwlClass c2 : classes.values()) {
+ c1.compareRestrictions(c2);
+ }
+ }
+
+ // The following two steps can affect each other, so repeat the block
+ // as many times as necessary.
+ boolean repeat;
+ do {
+ // RL rule scm-sco: subClassOf transitivity
+ // (takes in subclass info; yields subclass info)
+ // (This traverses the complete hierarchy, so we don't need to loop
+ // again if changes are only made in this step)
+ for (OwlClass subclass : classes.values()) {
+ subclass.computeSuperClasses();
+ }
+ // RL rules scm-svf1, scm-avf1: property restrictions & subclasses
+ // (take in subclass info & prop. restrictions; yield subclass info)
+ // (If changes are made here, loop through both steps again)
+ repeat = false;
+ for (OwlProperty prop : properties.values()) {
+ repeat = prop.compareRestrictions() || repeat;
+ }
+ } while (repeat);
+
+ // Apply RL rules scm-dom1, scm-rng1, scm-dom2, scm-rng2:
+ // (take in subclass/subproperty & domain/range; yield domain/range)
+ for (OwlProperty prop : properties.values()) {
+ prop.inheritDomainRange();
+ }
+ }
+
+ /**
+ * Determine whether a fact is contained in the Schema object
+ * relationships or implied by schema rules.
+ * @return True if this schema contains the semantics of the triple
+ */
+ public boolean containsTriple(Statement triple) {
+ // The schema certainly doesn't contain it if it's not a
+ // schema-relevant triple at all.
+ if (isSchemaTriple(triple)) {
+ Resource s = triple.getSubject();
+ URI p = triple.getPredicate();
+ Value o = triple.getObject();
+ // If this is telling us something about a property:
+ if (properties.containsKey(s)) {
+ OwlProperty prop = properties.get(s);
+ // Property types:
+ if (p.equals(RDF.TYPE)) {
+ if ((o.equals(OWL.TRANSITIVEPROPERTY)
+ && prop.isTransitive())
+ || (o.equals(OWL2.IRREFLEXIVEPROPERTY)
+ && prop.isIrreflexive())
+ || (o.equals(OWL.SYMMETRICPROPERTY)
+ && prop.isSymmetric())
+ || (o.equals(OWL2.ASYMMETRICPROPERTY)
+ && prop.isAsymmetric())
+ || (o.equals(OWL.FUNCTIONALPROPERTY)
+ && prop.isFunctional())
+ || (o.equals(OWL.INVERSEFUNCTIONALPROPERTY)
+ && prop.isInverseFunctional())) {
+ return true;
+ }
+ }
+ // Relationships with other properties:
+ if ((p.equals(RDFS.SUBPROPERTYOF)
+ && prop.getSuperProperties().contains(o))
+ || (p.equals(OWL2.PROPERTYDISJOINTWITH)
+ && prop.getDisjointProperties().contains(o))
+ || (p.equals(OWL.EQUIVALENTPROPERTY)
+ && prop.getEquivalentProperties().contains(o))
+ || (p.equals(OWL.INVERSEOF)
+ && prop.getInverseProperties().contains(o))) {
+ return true;
+ }
+ // Relationships with classes:
+ if ((p.equals(RDFS.DOMAIN)
+ && prop.getDomain().contains(o))
+ || (p.equals(RDFS.RANGE)
+ && prop.getRange().contains(o))) {
+ return true;
+ }
+ }
+ // If this is about a class relationship:
+ if (classes.containsKey(s)) {
+ OwlClass subject = classes.get(s);
+ if ((p.equals(OWL.EQUIVALENTCLASS)
+ && (subject.getEquivalentClasses().contains(o)))
+ || (p.equals(OWL.DISJOINTWITH)
+ && (subject.getDisjointClasses().contains(o)))
+ || (p.equals(OWL.COMPLEMENTOF)
+ && (subject.getComplementaryClasses().contains(o)))
+ || (p.equals(RDFS.SUBCLASSOF)
+ && (subject.getSuperClasses().contains(o)))) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Collect and return counts of different kinds of schema constructs
+ */
+ public String getSummary() {
+ int nRestrictions = 0;
+ for (Resource r : classes.keySet()) {
+ OwlClass c = classes.get(r);
+ if (!c.getOnProperty().isEmpty()) {
+ nRestrictions++;
+ }
+ }
+ int nClasses = classes.size();
+ int nProperties = properties.size();
+ String[] pTypes = { "Transitive", "Symmetric", "Asymmetric",
+ "Functional", "Inverse Functional", "Irreflexive" };
+ String[] rTypes = { "someValuesFrom", "allValuesFrom", "hasValue",
+ "maxCardinality==0", "maxCardinality>0",
+ "maxQualifiedCardinality==0", "maxQualifiedCardinality>0", };
+ String[] edgeTypes = { "Superclass", "Disjoint class", "Complement",
+ "Superproperty", "Disjoint property", "Inverse property",
+ "Domain", "Range",
+ "Equivalent class", "Equivalent property"};
+ int[] pTotals = { 0, 0, 0, 0, 0, 0 };
+ int[] rTotals = { 0, 0, 0, 0, 0, 0, 0 };
+ int[] edgeTotals = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+ for (OwlClass c : classes.values()) {
+ edgeTotals[0] += c.getSuperClasses().size() - 2;
+ edgeTotals[1] += c.getDisjointClasses().size();
+ edgeTotals[2] += c.getComplementaryClasses().size();
+ edgeTotals[8] += c.getEquivalentClasses().size() - 1;
+ if (!c.someValuesFrom().isEmpty()) rTotals[0]++;
+ if (!c.allValuesFrom().isEmpty()) rTotals[1]++;
+ if (!c.hasValue().isEmpty()) rTotals[2]++;
+ if (c.getMaxCardinality() == 0) rTotals[3]++;
+ if (c.getMaxCardinality() > 0) rTotals[4]++;
+ if (c.getMaxQualifiedCardinality() == 0) rTotals[5]++;
+ if (c.getMaxQualifiedCardinality() > 0) rTotals[6]++;
+ }
+ for (OwlProperty p : properties.values()) {
+ if (p.isTransitive()) pTotals[0]++;
+ if (p.isSymmetric()) pTotals[1]++;
+ if (p.isAsymmetric()) pTotals[2]++;
+ if (p.isFunctional()) pTotals[3]++;
+ if (p.isInverseFunctional()) pTotals[4]++;
+ if (p.isIrreflexive()) pTotals[5]++;
+ edgeTotals[3] += p.getSuperProperties().size() - 1;
+ edgeTotals[4] += p.getDisjointProperties().size();
+ edgeTotals[5] += p.getInverseProperties().size();
+ edgeTotals[6] += p.getDomain().size();
+ edgeTotals[7] += p.getRange().size();
+ edgeTotals[9] += p.getEquivalentProperties().size();
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("Schema summary:");
+ sb.append("\n\tClasses: " + nClasses);
+ sb.append("\n\t\tProperty Restrictions: ").append(nRestrictions);
+ for (int i = 0; i < rTypes.length; i++) {
+ sb.append("\n\t\t\t");
+ sb.append(rTypes[i]).append(": ").append(rTotals[i]);
+ }
+ sb.append("\n\t\tOther: ").append(nClasses-nRestrictions);
+ sb.append("\n\tProperties: ").append(nProperties);
+ for (int i = 0; i < pTypes.length; i++) {
+ sb.append("\n\t\t");
+ sb.append(pTypes[i]).append(": ").append(pTotals[i]);
+ }
+ sb.append("\n\tConnections:");
+ for (int i = 0; i < edgeTypes.length; i++) {
+ sb.append("\n\t\t");
+ sb.append(edgeTypes[i]).append(": ").append(edgeTotals[i]);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Assuming a given resource corresponds to a property restriction,
+ * describe the restriction.
+ */
+ public String explainRestriction(Resource type) {
+ StringBuilder sb = new StringBuilder();
+ if (classes.containsKey(type)) {
+ OwlClass pr = classes.get(type);
+ sb.append("owl:Restriction");
+ for (URI p : pr.getOnProperty()) {
+ sb.append(" (owl:onProperty ").append(p.toString()).append(")");
+ }
+ for (Value v : pr.hasValue()) {
+ sb.append(" (owl:hasValue ").append(v.toString()).append(")");
+ }
+ for (Resource c : pr.someValuesFrom()) {
+ sb.append(" (owl:someValuesFrom ").append(c.toString()).append(")");
+ }
+ for (Resource c : pr.allValuesFrom()) {
+ sb.append(" (owl:allValuesFrom ").append(c.toString()).append(")");
+ }
+ int mc = pr.getMaxCardinality();
+ int mqc = pr.getMaxQualifiedCardinality();
+ if (mc >= 0) {
+ sb.append(" (owl:maxCardinality ").append(mc).append(")");
+ }
+ if (mqc >= 0) {
+ sb.append(" (owl:maxQualifiedCardinality ").append(mqc);
+ }
+ for (Resource c : pr.onClass()) {
+ sb.append(" owl:onClass ").append(c.toString()).append(")");
+ }
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java
new file mode 100644
index 0000000..1d1ac8f
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java
@@ -0,0 +1,234 @@
+package mvm.rya.reasoning;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDF;
+
+/**
+ * Keep track of a single node's types and do reasoning about its types.
+ */
+public class TypeReasoner extends AbstractReasoner {
+ // This node's types, whether asserted or derived
+ Map<Resource, Fact> knownTypes = new HashMap<>();
+
+ // Inferences waiting for particular types to be discovered
+ Map<Resource, List<Fact>> possibleInferences = new HashMap<>();
+ Map<Resource, List<Derivation>> possibleInconsistencies = new HashMap<>();
+
+ /**
+ * Constructor.
+ * @param node Conduct reasoning about/around this node
+ * @param schema Global schema (class/property) information
+ * @param t Iteration # of new triples
+ * @param tSchema Iteration # of latest schema change
+ */
+ public TypeReasoner(Resource node, Schema schema, int t, int tSchema) {
+ super(node, schema, t, tSchema);
+ }
+
+ /**
+ * Process a type (class) assignment from the input. It may have been
+ * inferred during a previous iteration.
+ * @param typeFact An assertion about one of this node's types
+ */
+ void processType(Fact typeFact) {
+ Resource type = (Resource) typeFact.getObject();
+ boolean newType = !knownTypes.containsKey(type);
+ int t = typeFact.getIteration();
+ // Save the type in memory unless older knowledge takes precedence
+ if (newType || t < knownTypes.get(type).getIteration()) {
+ knownTypes.put(type, typeFact);
+ // Perform further inference
+ typeInference(typeFact);
+ }
+ }
+
+ /**
+ * Produce and process a type derivation from this iteration.
+ * TODO: how to implement rules that would produce "literal rdf:type ?x"
+ * @param type The type itself
+ * @param rule The generating rule
+ * @param source The source of the derivation
+ */
+ void processType(Resource type, OwlRule rule, Fact source) {
+ processType(triple(node, RDF.TYPE, type, rule, source));
+ }
+
+ /**
+ * Infer additional information from a type assertion.
+ */
+ void typeInference(Fact typeFact) {
+ Resource type = (Resource) typeFact.getObject();
+ OwlClass c = schema.getClass(type);
+ // RL rule cls-nothing2: Inconsistent if type owl:Nothing
+ if (OWL.NOTHING.equals(type) && frontier(typeFact)) {
+ // Skip if this isn't a new fact
+ collectInconsistency(inconsistency(OwlRule.CLS_NOTHING2, typeFact));
+ }
+ // RL rule cax-dw: type shouldn't be disjointWith a previous type
+ Set<Resource> disjoint = c.getDisjointClasses();
+ disjoint.retainAll(knownTypes.keySet());
+ for (Resource other : disjoint) {
+ Fact otherTypeFact = knownTypes.get(other);
+ Derivation inc = inconsistency(OwlRule.CAX_DW, typeFact);
+ inc.addSource(otherTypeFact);
+ collectInconsistency(inc);
+ }
+ // RL rule cls-com: type shouldn't be complementOf a previous type
+ Set<Resource> complementary = c.getComplementaryClasses();
+ complementary.retainAll(knownTypes.keySet());
+ for (Resource other : complementary) {
+ Fact otherTypeFact = knownTypes.get(other);
+ Derivation inc = inconsistency(OwlRule.CLS_COM, typeFact);
+ inc.addSource(otherTypeFact);
+ collectInconsistency(inc);
+ }
+ // RL rule cax-sco: subClassOf semantics (derive superclasses)
+ if (!typeFact.hasRule(OwlRule.CAX_SCO)
+ && frontier(typeFact)) {
+ // Skip if typeFact itself came from this rule, and/or if typeFact
+ // generally shouldn't be the sole source of new information
+ for (Resource supertype : c.getSuperClasses()) {
+ // If the supertype isn't trivial, assert it
+ if (!supertype.equals(type)
+ && !(supertype.equals(OWL.THING))) {
+ processType(supertype, OwlRule.CAX_SCO, typeFact);
+ }
+ }
+ }
+ // Apply property restriction rules:
+ for (URI prop : c.getOnProperty()) {
+ // RL rule cls-hv1: if type is an owl:hasValue restriction
+ for (Value val : c.hasValue()) {
+ collect(triple(node, prop, val, OwlRule.CLS_HV1, typeFact));
+ }
+ }
+ // Derive any facts whose explicit condition is this type assignment
+ if (possibleInferences.containsKey(type)) {
+ for (Fact fact : possibleInferences.get(type)) {
+ Fact join = fact.clone();
+ join.addSource(typeFact);
+ collect(join);
+ }
+ }
+ // Derive any inconsistencies whose explicit condition is this type
+ if (possibleInconsistencies.containsKey(type)) {
+ for (Derivation d : possibleInconsistencies.get(type)) {
+ Derivation inc = d.clone();
+ inc.addSource(typeFact);
+ collectInconsistency(inc);
+ }
+ }
+ }
+
+ /**
+ * Assert an arbitrary fact if and when this node is determined to have
+ * a particular type. Facilitates join rules specifically concerning type.
+ */
+ void onType(Resource type, Fact fact) {
+ if (!possibleInferences.containsKey(type)) {
+ possibleInferences.put(type, new LinkedList<Fact>());
+ }
+ possibleInferences.get(type).add(fact);
+ // If we already know the type, assert the fact right away.
+ if (knownTypes.containsKey(type)) {
+ Fact join = fact.clone();
+ join.addSource(knownTypes.get(type));
+ collect(join);
+ }
+ }
+
+ /**
+ * Assert an inconsistency if and when this node is determined to have
+ * a particular type. Facilitates join rules specifically concerning type.
+ */
+ void inconsistentOnType(Resource type, Derivation fact) {
+ if (!possibleInconsistencies.containsKey(type)) {
+ possibleInconsistencies.put(type, new LinkedList<Derivation>());
+ }
+ possibleInconsistencies.get(type).add(fact);
+ // If we already know the type, assert the fact right away.
+ if (knownTypes.containsKey(type)) {
+ Derivation d = fact.clone();
+ d.addSource(knownTypes.get(type));
+ collectInconsistency(d);
+ }
+ }
+
+ /**
+ * Collect all the type knowledge into the output, if it represents new
+ * information.
+ */
+ void collectTypes() {
+ for (Resource type : knownTypes.keySet()) {
+ collect(knownTypes.get(type));
+ }
+ }
+
+ /**
+ * Get info about types derived and potential inferences.
+ */
+ @Override
+ public String getDiagnostics() {
+ int total = 0;
+ int incTotal = 0;
+ for (Resource uri : possibleInferences.keySet()) {
+ total += possibleInferences.get(uri).size();
+ }
+ for (Resource uri : possibleInconsistencies.keySet()) {
+ incTotal += possibleInconsistencies.get(uri).size();
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append(knownTypes.size()).append(" total types known\n");
+ sb.append("Watching for ").append(possibleInferences.size());
+ sb.append(" distinct types to trigger any of ").append(total);
+ sb.append(" possible inferences");
+ sb.append("Watching for ").append(possibleInconsistencies.size());
+ sb.append(" distinct types to trigger any of ").append(incTotal);
+ sb.append(" possible inconsistencies");
+ return sb.toString();
+ }
+
+ /**
+ * Get the total number of input facts cached.
+ */
+ @Override
+ public int getNumStored() {
+ int total = knownTypes.size();
+ for (List<Fact> l : possibleInferences.values()) {
+ total += l.size();
+ }
+ for (List<Derivation> l : possibleInconsistencies.values()) {
+ total += l.size();
+ }
+ return total;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java
new file mode 100644
index 0000000..dde83c6
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java
@@ -0,0 +1,492 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.accumulo.mr.fileinput.RdfFileInputFormat;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.reasoning.Derivation;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.input.CombineSequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.openrdf.rio.RDFFormat;
+
+/**
+ * Contains common functionality for MapReduce jobs involved in reasoning. A
+ * subclass should implement configureReasoningJob and its own mappers and
+ * reducers.
+ */
+abstract public class AbstractReasoningTool extends Configured implements Tool {
+ // Keep track of statistics about the input
+ protected static enum COUNTERS { ABOX, TBOX, USEFUL };
+
+ // MapReduce job, to be configured by subclasses
+ protected Job job;
+
+ /**
+ * Configure the job's inputs, outputs, mappers, and reducers.
+ */
+ abstract protected void configureReasoningJob(String[] args) throws Exception;
+
+ /**
+ * Configure and run a MapReduce job.
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ Configuration conf = getConf();
+ job = Job.getInstance(conf);
+ job.setJobName(getJobName());
+ job.setJarByClass(this.getClass());
+ configureReasoningJob(args);
+ boolean success = job.waitForCompletion(!MRReasoningUtils.stats(conf));
+ if (success) {
+ return 0;
+ }
+ else {
+ return 1;
+ }
+ }
+
+ /**
+ * Cumulative CPU time taken by all mappers/reducers.
+ */
+ public long getCumulativeTime() throws IOException {
+ return getCounter(TaskCounter.CPU_MILLISECONDS);
+ }
+
+ /**
+ * Default name for the MapReduce job:
+ */
+ protected String getJobName() {
+ return "Rya reasoning, pass " + MRReasoningUtils.getCurrentIteration(getConf())
+ + ": " + this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
+ }
+
+ /**
+ * Number of inconsistencies detected by this job.
+ */
+ public long getNumInconsistencies() throws IOException {
+ return getCounter(MultipleOutputs.class.getName(),
+ MRReasoningUtils.INCONSISTENT_OUT);
+ }
+
+ /**
+ * Number of new schema triples derived during this job.
+ */
+ public long getNumSchemaTriples() throws IOException {
+ return getCounter(MultipleOutputs.class.getName(),
+ MRReasoningUtils.SCHEMA_OUT);
+ }
+
+ /**
+ * Number of new instance triples that might be used for future reasoning
+ */
+ public long getNumUsefulOutput() throws IOException {
+ return getCounter(MultipleOutputs.class.getName(),
+ MRReasoningUtils.INTERMEDIATE_OUT);
+ }
+
+ /**
+ * Number of new instance triples that will not be used for future reasoning
+ */
+ public long getNumTerminalOutput() throws IOException {
+ return getCounter(MultipleOutputs.class.getName(),
+ MRReasoningUtils.TERMINAL_OUT);
+ }
+
+ /**
+ * Total number of new instance triples derived during this job.
+ */
+ public long getNumInstanceTriples() throws IOException {
+ return getNumUsefulOutput() + getNumTerminalOutput();
+ }
+
+ /**
+ * Number of instance triples seen as input during this job.
+ */
+ public long getNumInstanceInput() throws IOException {
+ return getCounter(COUNTERS.ABOX);
+ }
+
+ /**
+ * Number of schema triples seen as input during this job.
+ */
+ public long getNumSchemaInput() throws IOException {
+ return getCounter(COUNTERS.TBOX);
+ }
+
+ /**
+ * Increment the schema or instance triple counter, as appropriate.
+ */
+ protected static void countInput(boolean schema, TaskAttemptContext context) {
+ if (schema) {
+ context.getCounter(COUNTERS.TBOX).increment(1);
+ }
+ else {
+ context.getCounter(COUNTERS.ABOX).increment(1);
+ }
+ }
+
+ /**
+ * Add the schema file (TBox) to the distributed cache for the current job.
+ */
+ protected void distributeSchema() {
+ Path schemaPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
+ job.addCacheFile(schemaPath.toUri());
+ }
+
+ /**
+ * Set up the MapReduce job to use as inputs both an Accumulo table and the
+ * files containing previously derived information, excluding
+ * inconsistencies. Looks for a file for every iteration number so far,
+ * preferring final cleaned up output from that iteration but falling back
+ * on intermediate data if necessary.
+ * @param tableMapper Mapper class to use for database input
+ * @param rdfMapper Mapper class to use for direct RDF input
+ * @param fileMapper Mapper class to use for derived triples input
+ * @param filter True to exclude previously derived data that couldn't be
+ * used to derive anything new at this point.
+ */
+ protected void configureMultipleInput(
+ Class<? extends Mapper<Key, Value, ?, ?>> tableMapper,
+ Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper,
+ Class<? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
+ boolean filter) throws IOException, AccumuloSecurityException {
+ Path inputPath = MRReasoningUtils.getInputPath(job.getConfiguration());
+ if (inputPath != null) {
+ configureRdfInput(inputPath, rdfMapper);
+ }
+ else {
+ configureAccumuloInput(tableMapper);
+ }
+ configureFileInput(fileMapper, filter);
+ }
+
+ /**
+ * Set up the MapReduce job to use as inputs both an Accumulo table and the
+ * files containing previously derived information. Looks for a file for
+ * every iteration number so far, preferring final cleaned up output from
+ * that iteration but falling back on intermediate data if necessary.
+ * @param tableMapper Mapper class to use for database input
+ * @param rdfMapper Mapper class to use for direct RDF input
+ * @param fileMapper Mapper class to use for derived triples input
+ * @param incMapper Mapper class to use for derived inconsistencies input
+ * @param filter True to exclude previously derived data that couldn't be
+ * used to derive anything new at this point.
+ */
+ protected void configureMultipleInput(
+ Class<? extends Mapper<Key, Value, ?, ?>> tableMapper,
+ Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper,
+ Class<? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
+ Class<? extends Mapper<Derivation, NullWritable, ?, ?>> incMapper,
+ boolean filter)
+ throws IOException, AccumuloSecurityException {
+ Path inputPath = MRReasoningUtils.getInputPath(job.getConfiguration());
+ if (inputPath != null) {
+ configureRdfInput(inputPath, rdfMapper);
+ }
+ else {
+ configureAccumuloInput(tableMapper);
+ }
+ configureFileInput(fileMapper, incMapper, filter);
+ }
+
+ /**
+ * Set up the MapReduce job to use file inputs from previous iterations,
+ * excluding inconsistencies found.
+ * @param fileMapper Mapper class to use for generated triples
+ * @param filter Exclude facts that aren't helpful for inference
+ */
+ protected void configureFileInput(
+ Class <? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
+ final boolean filter) throws IOException {
+ configureFileInput(fileMapper, null, filter);
+ }
+
+ /**
+ * Set up the MapReduce job to use file inputs from previous iterations.
+ * @param fileMapper Mapper class for generated triples
+ * @param incMapper Mapper class for generated inconsistenies
+ * @param filter Exclude facts that aren't helpful for inference
+ */
+ protected void configureFileInput(
+ Class <? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
+ Class <? extends Mapper<Derivation, NullWritable, ?, ?>> incMapper,
+ final boolean filter) throws IOException {
+ // Set up file input for all iterations up to this one
+ Configuration conf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ Path inputPath;
+ int iteration = MRReasoningUtils.getCurrentIteration(conf);
+ // Set min/max split, if not already provided:
+ long blocksize = Long.parseLong(conf.get("dfs.blocksize"));
+ String minSplitProp = "mapreduce.input.fileinputformat.split.minsize";
+ String maxSplitProp = "mapreduce.input.fileinputformat.split.maxsize";
+ conf.set(minSplitProp, conf.get(minSplitProp, String.valueOf(blocksize)));
+ conf.set(maxSplitProp, conf.get(maxSplitProp, String.valueOf(blocksize*8)));
+ for (int i = 1; i <= iteration; i++) {
+ // Prefer cleaned output...
+ inputPath = MRReasoningUtils.getOutputPath(conf,
+ MRReasoningUtils.OUTPUT_BASE + i);
+ // But if there isn't any, try intermediate data:
+ if (!fs.isDirectory(inputPath)) {
+ inputPath = MRReasoningUtils.getOutputPath(conf,
+ MRReasoningUtils.OUTPUT_BASE + i
+ + MRReasoningUtils.TEMP_SUFFIX);
+ }
+ // And only proceed if we found one or the other.
+ if (fs.isDirectory(inputPath)) {
+ // Never include debug output. If filter is true, select only
+ // intermediate and schema data, otherwise include everything.
+ PathFilter f = new PathFilter() {
+ public boolean accept(Path path) {
+ String s = path.getName();
+ if (s.startsWith(MRReasoningUtils.DEBUG_OUT)) {
+ return false;
+ }
+ else {
+ return !filter
+ || s.startsWith(MRReasoningUtils.INTERMEDIATE_OUT)
+ || s.startsWith(MRReasoningUtils.SCHEMA_OUT);
+ }
+ }
+ };
+ for (FileStatus status : fs.listStatus(inputPath, f)) {
+ if (status.getLen() > 0) {
+ Path p = status.getPath();
+ String s = p.getName();
+ if (s.startsWith(MRReasoningUtils.INCONSISTENT_OUT)) {
+ if (incMapper != null) {
+ MultipleInputs.addInputPath(job, p,
+ CombineSequenceFileInputFormat.class, incMapper);
+ }
+ }
+ else {
+ MultipleInputs.addInputPath(job, status.getPath(),
+ CombineSequenceFileInputFormat.class, fileMapper);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Set up the MapReduce job to use Accumulo as an input.
+ * @param tableMapper Mapper class to use
+ */
+ protected void configureAccumuloInput(Class<? extends Mapper<Key,Value,?,?>> tableMapper)
+ throws AccumuloSecurityException {
+ MRReasoningUtils.configureAccumuloInput(job);
+ MultipleInputs.addInputPath(job, new Path("/tmp/input"),
+ AccumuloInputFormat.class, tableMapper);
+ }
+
+ /**
+ * Set up the MapReduce job to use an RDF file as an input.
+ * @param rdfMapper class to use
+ */
+ protected void configureRdfInput(Path inputPath,
+ Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper) {
+ Configuration conf = job.getConfiguration();
+ String format = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName());
+ conf.set(MRUtils.FORMAT_PROP, format);
+ MultipleInputs.addInputPath(job, inputPath,
+ RdfFileInputFormat.class, rdfMapper);
+ }
+
+ /**
+ * Set up the MapReduce job to output a schema (TBox).
+ */
+ protected void configureSchemaOutput() {
+ Path outPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
+ SequenceFileOutputFormat.setOutputPath(job, outPath);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(SchemaWritable.class);
+ LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+ MultipleOutputs.addNamedOutput(job, "schemaobj",
+ SequenceFileOutputFormat.class, NullWritable.class, SchemaWritable.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
+ TextOutputFormat.class, Text.class, Text.class);
+ MultipleOutputs.setCountersEnabled(job, true);
+ }
+
+ /**
+ * Set up the MapReduce job to output newly derived triples. Outputs to
+ * directory [base]-[iteration].
+ */
+ protected void configureDerivationOutput() {
+ configureDerivationOutput(false);
+ }
+
+ /**
+ * Set up a MapReduce job to output newly derived triples.
+ * @param intermediate True if this is intermediate data. Outputs
+ * to [base]-[iteration]-[temp].
+ */
+ protected void configureDerivationOutput(boolean intermediate) {
+ Path outPath;
+ Configuration conf = job.getConfiguration();
+ int iteration = MRReasoningUtils.getCurrentIteration(conf);
+ if (intermediate) {
+ outPath = MRReasoningUtils.getOutputPath(conf,
+ MRReasoningUtils.OUTPUT_BASE + iteration
+ + MRReasoningUtils.TEMP_SUFFIX);
+ }
+ else {
+ outPath = MRReasoningUtils.getOutputPath(conf,
+ MRReasoningUtils.OUTPUT_BASE + iteration);
+ }
+ SequenceFileOutputFormat.setOutputPath(job, outPath);
+ LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
+ SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
+ SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
+ SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
+ SequenceFileOutputFormat.class, Derivation.class, NullWritable.class);
+ MultipleOutputs.setCountersEnabled(job, true);
+ // Set up an output for diagnostic info, if needed
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
+ TextOutputFormat.class, Text.class, Text.class);
+ }
+
+ /**
+ * Set up a MapReduce job to output human-readable text.
+ */
+ protected void configureTextOutput(String destination) {
+ Path outPath;
+ outPath = MRReasoningUtils.getOutputPath(job.getConfiguration(), destination);
+ TextOutputFormat.setOutputPath(job, outPath);
+ LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
+ TextOutputFormat.class, NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
+ TextOutputFormat.class, NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
+ TextOutputFormat.class, NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
+ TextOutputFormat.class, NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
+ TextOutputFormat.class, Text.class, Text.class);
+ MultipleOutputs.setCountersEnabled(job, true);
+ }
+
+ /**
+ * Get the name of the output to send an inconsistency to.
+ * @return The name of the output file(s) to send inconsistencies to
+ */
+ protected static String getOutputName(Derivation inconsistency) {
+ return MRReasoningUtils.INCONSISTENT_OUT;
+ }
+
+ /**
+ * Get the name of the output to send a fact to.
+ * @param fact The fact itself
+ * @param finalOut True if this is for final output, not intermediate
+ * @return The name of the output file(s) to send this fact to
+ */
+ protected static String getOutputName(Fact fact, boolean finalOut) {
+ if (Schema.isSchemaTriple(fact.getTriple())) {
+ return MRReasoningUtils.SCHEMA_OUT;
+ }
+ else if (!finalOut && fact.isUseful()) {
+ return MRReasoningUtils.INTERMEDIATE_OUT;
+ }
+ else {
+ return MRReasoningUtils.TERMINAL_OUT;
+ }
+ }
+
+ /**
+ * Get the name of the output to send a fact to.
+ */
+ protected static String getOutputName(Fact fact) {
+ return getOutputName(fact, false);
+ }
+
+ /**
+ * Retrieve an arbitrary counter's value.
+ * @param group Counter's group name
+ * @param counter Name of the counter itself
+ */
+ public long getCounter(String group, String counter) throws IOException {
+ return job.getCounters().findCounter(group, counter).getValue();
+ }
+
+ /**
+ * Retrieve an arbitrary counter's value.
+ * @param key The Enum tied to this counter
+ */
+ public long getCounter(Enum<?> key) throws IOException {
+ return job.getCounters().findCounter(key).getValue();
+ }
+
+ /**
+ * Get the current iteration according to this job's configuration.
+ */
+ public int getIteration() {
+ return MRReasoningUtils.getCurrentIteration(getConf());
+ }
+
+ /**
+ * Get the job's JobID.
+ */
+ public JobID getJobID() {
+ return job.getJobID();
+ }
+
+ /**
+ * Get the elapsed wall-clock time, assuming the job is done.
+ */
+ public long getElapsedTime() throws IOException, InterruptedException {
+ return job.getFinishTime() - job.getStartTime();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java
new file mode 100644
index 0000000..02cce66
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java
@@ -0,0 +1,436 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.reasoning.Fact;
+import mvm.rya.reasoning.Schema;
+
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.OpenRDFException;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.helpers.RDFHandlerBase;
+import org.openrdf.rio.ntriples.NTriplesParser;
+import org.openrdf.rio.rdfxml.RDFXMLParser;
+import org.openrdf.sail.memory.MemoryStore;
+
+/**
+ * Test the reasoner against Owl conformance tests in the database.
+ */
+public class ConformanceTest extends Configured implements Tool {
+ static String TYPE = RDF.TYPE.stringValue();
+ static String TEST = "http://www.w3.org/2007/OWL/testOntology#";
+ static String TEST_CONSISTENCY = TEST + "ConsistencyTest";
+ static String TEST_INCONSISTENCY = TEST + "InconsistencyTest";
+ static String TEST_ENTAILMENT = TEST + "PositiveEntailmentTest";
+ static String TEST_NONENTAILMENT = TEST + "NegativeEntailmentTest";
+ static String TEST_ID = TEST + "identifier";
+ static String TEST_DESC = TEST + "description";
+ static String TEST_PROFILE = TEST + "profile";
+ static String TEST_PREMISE = TEST + "rdfXmlPremiseOntology";
+ static String TEST_CONCLUSION = TEST + "rdfXmlConclusionOntology";
+ static String TEST_NONCONCLUSION = TEST + "rdfXmlNonConclusionOntology";
+ static String TEST_RL = TEST + "RL";
+ static String TEST_SEMANTICS = TEST + "semantics";
+ static String TEST_RDFBASED = TEST + "RDF-BASED";
+
+ private static class OwlTest extends RDFHandlerBase {
+ Value uri;
+ String name;
+ String description;
+ String premise;
+ String compareTo;
+ Set<String> types = new HashSet<>();
+ boolean success;
+ Set<Statement> expected = new HashSet<>();
+ Set<Statement> unexpected = new HashSet<>();
+ Set<Statement> inferred = new HashSet<>();
+ Set<Statement> error = new HashSet<>();
+ @Override
+ public void handleStatement(Statement st) {
+ if (types.contains(TEST_ENTAILMENT)) {
+ expected.add(st);
+ }
+ else if (types.contains(TEST_NONENTAILMENT)) {
+ unexpected.add(st);
+ }
+ }
+ String type() {
+ StringBuilder sb = new StringBuilder();
+ if (types.contains(TEST_CONSISTENCY)) {
+ sb.append("{Consistency}");
+ }
+ if (types.contains(TEST_INCONSISTENCY)) {
+ sb.append("{Inconsistency}");
+ }
+ if (types.contains(TEST_ENTAILMENT)) {
+ sb.append("{Entailment}");
+ }
+ if (types.contains(TEST_NONENTAILMENT)) {
+ sb.append("{Nonentailment}");
+ }
+ return sb.toString();
+ }
+ }
+
+ private static class OutputCollector extends RDFHandlerBase {
+ Set<Statement> triples = new HashSet<>();
+ @Override
+ public void handleStatement(Statement st) {
+ triples.add(st);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new ConformanceTest(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ // Validate command
+ if (args.length < 1 || args.length > 2) {
+ System.out.println("Usage:\n");
+ System.out.println("\tConformanceTest [configuration options] "
+ + "<test-file> <temp-dir>\n");
+ System.out.println("to load test data from an RDF file "
+ + "(configuration property " + MRUtils.FORMAT_PROP
+ + " specifies the format, default RDF/XML); or\n");
+ System.out.println("\tConformanceTest [configuration options] <temp-dir>\n");
+ System.out.println("to load test data from a Rya instance (specified "
+ + "using standard configuration properties).\n");
+ System.out.println("For each test given, run the reasoner over the "
+ + "premise ontology using a temporary Mini Accumulo instance "
+ + "at <temp-dir>, then report conformance results.");
+ System.exit(1);
+ }
+
+ Set<Value> conformanceTestURIs = new HashSet<>();
+ Collection<OwlTest> conformanceTests = new LinkedList<>();
+ List<OwlTest> successes = new LinkedList<>();
+ List<OwlTest> failures = new LinkedList<>();
+ Configuration conf = getConf();
+ Repository repo;
+ File workingDir;
+
+ // If tests are in a file, stick them in a repository for querying
+ if (args.length == 2) {
+ workingDir = new File(args[1]);
+ RDFFormat inputFormat= RDFFormat.RDFXML;
+ String formatString = conf.get(MRUtils.FORMAT_PROP);
+ if (formatString != null) {
+ inputFormat = RDFFormat.valueOf(formatString);
+ }
+ repo = new SailRepository(new MemoryStore());
+ repo.initialize();
+ RepositoryConnection conn = repo.getConnection();
+ conn.add(new FileInputStream(args[0]), "", inputFormat);
+ conn.close();
+ }
+ // Otherwise, get a Rya repository
+ else {
+ workingDir = new File(args[0]);
+ repo = MRReasoningUtils.getRepository(conf);
+ repo.initialize();
+ }
+
+ // Query for the tests we're interested in
+ RepositoryConnection conn = repo.getConnection();
+ conformanceTestURIs.addAll(getTestURIs(conn, TEST_INCONSISTENCY));
+ conformanceTestURIs.addAll(getTestURIs(conn, TEST_CONSISTENCY));
+ conformanceTestURIs.addAll(getTestURIs(conn, TEST_ENTAILMENT));
+ conformanceTestURIs.addAll(getTestURIs(conn, TEST_NONENTAILMENT));
+ conformanceTests = getTests(conn, conformanceTestURIs);
+ conn.close();
+ repo.shutDown();
+
+ // Set up a MiniAccumulo cluster and set up conf to connect to it
+ String username = "root";
+ String password = "root";
+ MiniAccumuloCluster mini = new MiniAccumuloCluster(workingDir, password);
+ mini.start();
+ conf.set(MRUtils.AC_INSTANCE_PROP, mini.getInstanceName());
+ conf.set(MRUtils.AC_ZK_PROP, mini.getZooKeepers());
+ conf.set(MRUtils.AC_USERNAME_PROP, username);
+ conf.set(MRUtils.AC_PWD_PROP, password);
+ conf.setBoolean(MRUtils.AC_MOCK_PROP, false);
+ conf.set(MRUtils.TABLE_PREFIX_PROPERTY, "temp_");
+ // Run the conformance tests
+ int result;
+ for (OwlTest test : conformanceTests) {
+ System.out.println(test.uri);
+ result = runTest(conf, args, test);
+ if (result != 0) {
+ return result;
+ }
+ if (test.success) {
+ successes.add(test);
+ System.out.println("(SUCCESS)");
+ }
+ else {
+ failures.add(test);
+ System.out.println("(FAIL)");
+ }
+ }
+ mini.stop();
+
+ System.out.println("\n" + successes.size() + " successful tests:");
+ for (OwlTest test : successes) {
+ System.out.println("\t[SUCCESS] " + test.type() + " " + test.name);
+ }
+ System.out.println("\n" + failures.size() + " failed tests:");
+ for (OwlTest test : failures) {
+ System.out.println("\t[FAIL] " + test.type() + " " + test.name);
+ System.out.println("\t\t(" + test.description + ")");
+ for (Statement triple : test.error) {
+ if (test.types.contains(TEST_ENTAILMENT)) {
+ System.out.println("\t\tExpected: " + triple);
+ }
+ else if (test.types.contains(TEST_NONENTAILMENT)) {
+ System.out.println("\t\tUnexpected: " + triple);
+ }
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * Verify that we can infer the correct triples or detect an inconsistency.
+ * @param conf Specifies working directory, etc.
+ * @param OwlTest Contains premise/conclusion graphs, will store result
+ * @return Return value of the MapReduce job
+ */
+ int runTest(Configuration conf, String[] args, OwlTest test)
+ throws Exception {
+ conf.setInt(MRReasoningUtils.STEP_PROP, 0);
+ conf.setInt(MRReasoningUtils.SCHEMA_UPDATE_PROP, 0);
+ conf.setBoolean(MRReasoningUtils.DEBUG_FLAG, true);
+ conf.setBoolean(MRReasoningUtils.OUTPUT_FLAG, true);
+ // Connect to MiniAccumulo and load the test
+ Repository repo = MRReasoningUtils.getRepository(conf);
+ repo.initialize();
+ RepositoryConnection conn = repo.getConnection();
+ conn.clear();
+ conn.add(new StringReader(test.premise), "", RDFFormat.RDFXML);
+ conn.close();
+ repo.shutDown();
+ // Run the reasoner
+ ReasoningDriver reasoner = new ReasoningDriver();
+ int result = ToolRunner.run(conf, reasoner, args);
+ test.success = (result == 0);
+ // Inconsistency test: successful if determined inconsistent
+ if (test.types.contains(TEST_INCONSISTENCY)) {
+ test.success = test.success && reasoner.hasInconsistencies();
+ }
+ // Consistency test: successful if determined consistent
+ if (test.types.contains(TEST_CONSISTENCY)) {
+ test.success = test.success && !reasoner.hasInconsistencies();
+ }
+ // Other types: we'll need to look at the inferred triples/schema
+ if (test.types.contains(TEST_NONENTAILMENT)
+ || test.types.contains(TEST_ENTAILMENT)) {
+ System.out.println("Reading inferred triples...");
+ // Read in the inferred triples from HDFS:
+ Schema schema = MRReasoningUtils.loadSchema(conf);
+ FileSystem fs = FileSystem.get(conf);
+ Path path = MRReasoningUtils.getOutputPath(conf, "final");
+ OutputCollector inferred = new OutputCollector();
+ NTriplesParser parser = new NTriplesParser();
+ parser.setRDFHandler(inferred);
+ if (fs.isDirectory(path)) {
+ for (FileStatus status : fs.listStatus(path)) {
+ String s = status.getPath().getName();
+ if (s.startsWith(MRReasoningUtils.INCONSISTENT_OUT)
+ || s.startsWith(MRReasoningUtils.DEBUG_OUT)) {
+ continue;
+ }
+ BufferedReader br = new BufferedReader(
+ new InputStreamReader(fs.open(status.getPath())));
+ parser.parse(br, "");
+ br.close();
+ }
+ }
+ MRReasoningUtils.deleteIfExists(conf, "final");
+ test.inferred.addAll(inferred.triples);
+ // Entailment test: successful if expected triples were inferred
+ if (test.types.contains(TEST_ENTAILMENT)) {
+ // Check expected inferences against the inferred triples and
+ // the schema reasoner
+ for (Statement st : test.expected) {
+ Fact fact = new Fact(st);
+ if (!test.inferred.contains(st)
+ && !triviallyTrue(fact.getTriple(), schema)
+ && !schema.containsTriple(fact.getTriple())) {
+ test.error.add(st);
+ }
+ }
+ }
+ // Non-entailment test: failure if non-expected triples inferred
+ if (test.types.contains(TEST_NONENTAILMENT)) {
+ for (Statement st : test.unexpected) {
+ Fact fact = new Fact(st);
+ if (test.inferred.contains(st)
+ || schema.containsTriple(fact.getTriple())) {
+ test.error.add(st);
+ }
+ }
+ }
+ test.success = test.success && test.error.isEmpty();
+ }
+ conf.setBoolean(MRReasoningUtils.DEBUG_FLAG, false);
+ MRReasoningUtils.clean(conf);
+ return result;
+ }
+
+ /**
+ * Query a connection for conformance tests matching a particular
+ * test type.
+ */
+ Set<Value> getTestURIs(RepositoryConnection conn, String testType)
+ throws IOException, OpenRDFException {
+ Set<Value> testURIs = new HashSet<>();
+ TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL,
+ "select ?test where { " +
+ "?test <" + TYPE + "> <" + testType + "> .\n" +
+ "?test <" + TEST_PROFILE + "> <" + TEST_RL + "> .\n" +
+ "?test <" + TEST_SEMANTICS + "> <" + TEST_RDFBASED + "> .\n" +
+ "}");
+ TupleQueryResult queryResult = query.evaluate();
+ while (queryResult.hasNext()) {
+ BindingSet bindings = queryResult.next();
+ testURIs.add(bindings.getValue("test"));
+ }
+ queryResult.close();
+ return testURIs;
+ }
+
+ /**
+ * Query a connection for conformance test details.
+ */
+ Collection<OwlTest> getTests(RepositoryConnection conn, Set<Value> testURIs)
+ throws IOException, OpenRDFException {
+ Map<Value, OwlTest> tests = new HashMap<>();
+ TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL,
+ "select * where { " +
+ "?test <" + TYPE + "> ?testType .\n" +
+ "?test <" + TEST_PREMISE + "> ?graph .\n" +
+ "?test <" + TEST_ID + "> ?name .\n" +
+ "?test <" + TEST_DESC + "> ?description .\n" +
+ "?test <" + TEST_PROFILE + "> <" + TEST_RL + "> .\n" +
+ "?test <" + TEST_SEMANTICS + "> <" + TEST_RDFBASED + "> .\n" +
+ "OPTIONAL {?test <" + TEST_CONCLUSION + "> ?conclusion .}\n" +
+ "OPTIONAL {?test <" + TEST_NONCONCLUSION + "> ?nonentailed .}\n" +
+ "}");
+ TupleQueryResult queryResult = query.evaluate();
+ while (queryResult.hasNext()) {
+ BindingSet bindings = queryResult.next();
+ Value uri = bindings.getValue("test");
+ if (testURIs.contains(uri)) {
+ OwlTest test;
+ if (tests.containsKey(uri)) {
+ test = tests.get(uri);
+ }
+ else {
+ test = new OwlTest();
+ test.uri = uri;
+ test.name = bindings.getValue("name").stringValue();
+ test.description = bindings.getValue("description").stringValue();
+ test.premise = bindings.getValue("graph").stringValue();
+ if (bindings.hasBinding("conclusion")) {
+ test.compareTo = bindings.getValue("conclusion").stringValue();
+ }
+ if (bindings.hasBinding("nonentailed")) {
+ test.compareTo = bindings.getValue("nonentailed").stringValue();
+ }
+ tests.put(uri, test);
+ }
+ test.types.add(bindings.getValue("testType").stringValue());
+ }
+ }
+ for (OwlTest test : tests.values()) {
+ if (test.compareTo != null) {
+ RDFXMLParser parser = new RDFXMLParser();
+ parser.setRDFHandler(test);
+ parser.parse(new StringReader(test.compareTo), "");
+ }
+ }
+ queryResult.close();
+ return tests.values();
+ }
+
+ /**
+ * Determine that a statement is trivially true for purposes of entailment
+ * tests, such as an implicit "[bnode] type Ontology" triple or a
+ * "[class] type Class" triple as long as the class exists.
+ */
+ boolean triviallyTrue(Statement triple, Schema schema) {
+ Resource s = triple.getSubject();
+ URI p = triple.getPredicate();
+ Value o = triple.getObject();
+ if (p.equals(RDF.TYPE)) {
+ if (o.equals(OWL.ONTOLOGY)) {
+ return true;
+ }
+ else if (o.equals(OWL.CLASS)) {
+ return schema.hasClass(s);
+ }
+ else if ((o.equals(OWL.OBJECTPROPERTY)
+ || o.equals(OWL.DATATYPEPROPERTY))
+ && s instanceof URI) {
+ // Distinction not maintained, irrelevant to RL rules
+ return schema.hasProperty((URI) s);
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java
new file mode 100644
index 0000000..3e54367
--- /dev/null
+++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java
@@ -0,0 +1,225 @@
+package mvm.rya.reasoning.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.reasoning.Derivation;
+import mvm.rya.reasoning.Fact;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+public class DuplicateElimination extends AbstractReasoningTool {
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new DuplicateElimination(), args));
+ }
+
+ @Override
+ protected void configureReasoningJob(String[] args) throws Exception {
+ configureMultipleInput(DuplicateTableMapper.class,
+ DuplicateRdfMapper.class, DuplicateFileMapper.class,
+ InconsistencyMapper.class, false);
+ job.setMapOutputKeyClass(Fact.class);
+ job.setMapOutputValueClass(Derivation.class);
+ job.setReducerClass(DuplicateEliminationReducer.class);
+ configureDerivationOutput();
+ }
+
+ public static class DuplicateEliminationMapper<K, V> extends Mapper<K, V,
+ Fact, Derivation> {
+ private MultipleOutputs<?, ?> debugOut;
+ private boolean debug;
+ private Text debugK = new Text();
+ private Text debugV = new Text();
+ private Fact emptyFact = new Fact();
+ @Override
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ debug = MRReasoningUtils.debug(conf);
+ if (debug) {
+ debugOut = new MultipleOutputs<>(context);
+ }
+ }
+ @Override
+ public void cleanup(Context context) throws IOException,
+ InterruptedException {
+ if (debugOut != null) {
+ debugOut.close();
+ }
+ }
+ protected void process(Context context, Fact fact, Derivation d,
+ String source) throws IOException, InterruptedException {
+ context.write(fact, d);
+ }
+
+ protected void process(Context context, Fact fact,
+ String source) throws IOException, InterruptedException {
+ if (debug) {
+ debugK.set("MAP_" + source + ": " + fact.explain(false));
+ debugV.set("iteration=" + fact.getIteration()
+ + ", size=" + fact.span());
+ debugOut.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+ }
+ Derivation d = fact.unsetDerivation();
+ process(context, fact, d, source);
+ }
+
+ protected void process(Context context, Derivation d,
+ String source) throws IOException, InterruptedException {
+ if (debug) {
+ debugK.set("MAP_" + source + ": inconsistency : "
+ + d.explain(false));
+ debugV.set("iteration=" + d.getIteration()
+ + ", size=" + d.span());
+ debugOut.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
+ }
+ emptyFact.setDerivation(d);
+ process(context, emptyFact, d, source);
+ }
+ }
+
+ public static class DuplicateTableMapper extends DuplicateEliminationMapper<
+ Key, Value> {
+ private Fact inputTriple = new Fact();
+ @Override
+ public void map(Key row, Value data, Context context)
+ throws IOException, InterruptedException {
+ inputTriple.setTriple(MRReasoningUtils.getStatement(row, data,
+ context.getConfiguration()));
+ process(context, inputTriple, "TABLE");
+ }
+ }
+
+ public static class DuplicateFileMapper extends DuplicateEliminationMapper<
+ Fact, NullWritable> {
+ @Override
+ public void map(Fact key, NullWritable value, Context context)
+ throws IOException, InterruptedException {
+ process(context, key, "STEP-" + key.getIteration());
+ }
+ }
+
+ public static class DuplicateRdfMapper extends DuplicateEliminationMapper<
+ LongWritable, RyaStatementWritable> {
+ private Fact inputTriple = new Fact();
+ @Override
+ public void map(LongWritable key, RyaStatementWritable value,
+ Context context) throws IOException, InterruptedException {
+ inputTriple.setTriple(value.getRyaStatement());
+ process(context, inputTriple, "RDF");
+ }
+ }
+
+ public static class InconsistencyMapper extends DuplicateEliminationMapper<
+ Derivation, NullWritable> {
+ @Override
+ public void map(Derivation key, NullWritable value, Context context)
+ throws IOException, InterruptedException {
+ process(context, key, "INCONSISTENCY-STEP-" + key.getIteration());
+ }
+ }
+
+ public static class DuplicateEliminationReducer extends Reducer<
+ Fact, Derivation, Fact, NullWritable> {
+ protected MultipleOutputs<?, ?> mout;
+ protected int current;
+ protected boolean debug;
+ protected Logger log = Logger.getLogger(DuplicateEliminationReducer.class);
+ protected long totalInput = 0;
+ protected long totalFacts = 0;
+ protected long totalOutput = 0;
+ @Override
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ mout = new MultipleOutputs<>(context);
+ current = MRReasoningUtils.getCurrentIteration(conf);
+ debug = MRReasoningUtils.debug(conf);
+ }
+ @Override
+ public void cleanup(Context context) throws IOException,
+ InterruptedException {
+ mout.close();
+ log.info("Input records processed: " + totalInput);
+ log.info("Distinct facts: " + totalFacts);
+ log.info("Output facts: " + totalOutput);
+ }
+ @Override
+ public void reduce(Fact fact, Iterable<Derivation> derivations,
+ Context context) throws IOException, InterruptedException {
+ log.debug(fact.toString() + ":");
+ totalFacts++;
+ // We only need to output this fact if it hasn't been derived
+ // before this step (and wasn't in the original data, marked
+ // as iteration 0). If we do want to output it, prefer the simplest
+ // derivation.
+ Derivation best = null;
+ boolean newFact = true;
+ int count = 0;
+ for (Derivation derivation : derivations) {
+ count++;
+ if (newFact) {
+ if (derivation.getIteration() >= current) {
+ // Valid so far; check if this is the best derivation:
+ if (best == null || best.span() > derivation.span()) {
+ best = derivation.clone();
+ }
+ }
+ else if (debug) {
+ newFact = false;
+ }
+ else {
+ return;
+ }
+ }
+ if (debug) {
+ mout.write(MRReasoningUtils.DEBUG_OUT,
+ new Text("DE " + fact.toString() + derivation.explain(false)),
+ new Text(Integer.toString(count) + "\t" + newFact));
+ }
+ }
+ totalInput += count;
+ if (newFact) {
+ totalOutput++;
+ if (fact.isEmpty()) {
+ // If there's no triple, it must be an inconsistency
+ mout.write(getOutputName(best), best, NullWritable.get());
+ }
+ else {
+ // Output a triple
+ fact.setDerivation(best);
+ mout.write(getOutputName(fact), fact, NullWritable.get());
+ }
+ }
+ log.debug(totalFacts + " facts, " + totalInput + " input records, "
+ + totalOutput + " output records");
+ }
+ }
+}