You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/06/21 17:00:21 UTC
[06/10] incubator-rya git commit: RYA-51 Temporal Indexing mongo
support
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
index 12791a6..be29d3c 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
@@ -8,9 +8,9 @@ package mvm.rya.indexing.accumulo.temporal;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -67,16 +67,17 @@ import mvm.rya.api.RdfCloudTripleStoreConfiguration;
import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.resolver.RyaToRdfConversions;
import mvm.rya.indexing.KeyParts;
-import mvm.rya.indexing.StatementContraints;
+import mvm.rya.indexing.StatementConstraints;
+import mvm.rya.indexing.StatementSerializer;
import mvm.rya.indexing.TemporalIndexer;
import mvm.rya.indexing.TemporalInstant;
+import mvm.rya.indexing.TemporalInstantRfc3339;
import mvm.rya.indexing.TemporalInterval;
import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.accumulo.StatementSerializer;
public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements TemporalIndexer {
- private static final Logger logger = Logger.getLogger(AccumuloTemporalIndexer.class);
+ private static final Logger logger = Logger.getLogger(AccumuloTemporalIndexer.class);
private static final String CF_INTERVAL = "interval";
@@ -115,22 +116,22 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
//initialization occurs in setConf because index is created using reflection
@Override
- public void setConf(Configuration conf) {
+ public void setConf(final Configuration conf) {
this.conf = conf;
if (!isInit) {
try {
initInternal();
isInit = true;
- } catch (AccumuloException e) {
+ } catch (final AccumuloException e) {
logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
- } catch (AccumuloSecurityException e) {
+ } catch (final AccumuloSecurityException e) {
logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
- } catch (TableNotFoundException e) {
+ } catch (final TableNotFoundException e) {
logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
- } catch (TableExistsException e) {
+ } catch (final TableExistsException e) {
logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
}
@@ -139,7 +140,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
@Override
public Configuration getConf() {
- return this.conf;
+ return conf;
}
@@ -152,34 +153,36 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* if that fails, tries: org.joda.time.DateTime.parse() .
* T O D O parse an interval using multiple predicates for same subject -- ontology dependent.
*/
- private void storeStatement(Statement statement) throws IOException, IllegalArgumentException {
+ private void storeStatement(final Statement statement) throws IOException, IllegalArgumentException {
// if the predicate list is empty, accept all predicates.
// Otherwise, make sure the predicate is on the "valid" list
- boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
- if (!isValidPredicate || !(statement.getObject() instanceof Literal))
+ final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
+ if (!isValidPredicate || !(statement.getObject() instanceof Literal)) {
return;
- DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval
+ }
+ final DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval
extractDateTime(statement, indexDateTimes);
- if (indexDateTimes[0]==null)
- return;
+ if (indexDateTimes[0]==null) {
+ return;
+ }
// Add this as an instant, or interval.
try {
if (indexDateTimes[1] != null) {
- TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1]));
+ final TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1]));
addInterval(temporalIndexBatchWriter, interval, statement);
} else {
- TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]);
+ final TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]);
addInstant(temporalIndexBatchWriter, instant, statement);
}
- } catch (MutationsRejectedException e) {
+ } catch (final MutationsRejectedException e) {
throw new IOException("While adding interval/instant for statement =" + statement, e);
}
}
@Override
- public void storeStatement(RyaStatement statement) throws IllegalArgumentException, IOException {
+ public void storeStatement(final RyaStatement statement) throws IllegalArgumentException, IOException {
storeStatement(RyaToRdfConversions.convertStatement(statement));
}
@@ -191,45 +194,46 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* @param statement
* @param outputDateTimes
*/
- private void extractDateTime(Statement statement, DateTime[] outputDateTimes) {
- if (!(statement.getObject() instanceof Literal)) // Error since it should already be tested by caller.
+ private void extractDateTime(final Statement statement, final DateTime[] outputDateTimes) {
+ if (!(statement.getObject() instanceof Literal)) {
throw new RuntimeException("Statement's object must be a literal: " + statement);
+ }
// throws IllegalArgumentException NumberFormatException if can't parse
- String logThis = null; Literal literalValue = (Literal) statement.getObject();
+ String logThis = null; final Literal literalValue = (Literal) statement.getObject();
// First attempt to parse a interval in the form "[date1,date2]"
- Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(literalValue.stringValue());
+ final Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(literalValue.stringValue());
if (matcher.find()) {
- try {
- // Got a datetime pair, parse into an interval.
- outputDateTimes[0] = new DateTime(matcher.group(1));
- outputDateTimes[1] = new DateTime(matcher.group(2));
- return;
- } catch (java.lang.IllegalArgumentException e) {
+ try {
+ // Got a datetime pair, parse into an interval.
+ outputDateTimes[0] = new DateTime(matcher.group(1));
+ outputDateTimes[1] = new DateTime(matcher.group(2));
+ return;
+ } catch (final java.lang.IllegalArgumentException e) {
logThis = e.getMessage() + " " + logThis;
outputDateTimes[0]=null;
outputDateTimes[1]=null;
- }
- }
-
- try {
- XMLGregorianCalendar calendarValue = literalValue.calendarValue();
- outputDateTimes[0] = new DateTime(calendarValue.toGregorianCalendar());
- outputDateTimes[1] = null;
- return;
- } catch (java.lang.IllegalArgumentException e) {
+ }
+ }
+
+ try {
+ final XMLGregorianCalendar calendarValue = literalValue.calendarValue();
+ outputDateTimes[0] = new DateTime(calendarValue.toGregorianCalendar());
+ outputDateTimes[1] = null;
+ return;
+ } catch (final java.lang.IllegalArgumentException e) {
logThis = e.getMessage();
- }
- // Try again using Joda Time DateTime.parse()
- try {
- outputDateTimes[0] = DateTime.parse(literalValue.stringValue());
- outputDateTimes[1] = null;
- //System.out.println(">>>>>>>Joda parsed: "+literalValue.stringValue());
- return;
- } catch (java.lang.IllegalArgumentException e) {
+ }
+ // Try again using Joda Time DateTime.parse()
+ try {
+ outputDateTimes[0] = DateTime.parse(literalValue.stringValue());
+ outputDateTimes[1] = null;
+ //System.out.println(">>>>>>>Joda parsed: "+literalValue.stringValue());
+ return;
+ } catch (final java.lang.IllegalArgumentException e) {
logThis = e.getMessage() + " " + logThis;
- }
+ }
logger.warn("TemporalIndexer is unable to parse the date/time from statement=" + statement.toString() + " " +logThis);
- return;
+ return;
}
/**
@@ -240,10 +244,10 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* @param interval
* @throws MutationsRejectedException
*/
- public void removeInterval(BatchWriter writer, TemporalInterval interval, Statement statement) throws MutationsRejectedException {
- Text cf = new Text(StatementSerializer.writeContext(statement));
- Text cqBegin = new Text(KeyParts.CQ_BEGIN);
- Text cqEnd = new Text(KeyParts.CQ_END);
+ public void removeInterval(final BatchWriter writer, final TemporalInterval interval, final Statement statement) throws MutationsRejectedException {
+ final Text cf = new Text(StatementSerializer.writeContext(statement));
+ final Text cqBegin = new Text(KeyParts.CQ_BEGIN);
+ final Text cqEnd = new Text(KeyParts.CQ_END);
// Start Begin index
Text keyText = new Text(interval.getAsKeyBeginning());
@@ -268,10 +272,10 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* @param instant
* @throws MutationsRejectedException
*/
- public void removeInstant(BatchWriter writer, TemporalInstant instant, Statement statement) throws MutationsRejectedException {
- KeyParts keyParts = new KeyParts(statement, instant);
- for (KeyParts k: keyParts) {
- Mutation m = new Mutation(k.getStoreKey());
+ public void removeInstant(final BatchWriter writer, final TemporalInstant instant, final Statement statement) throws MutationsRejectedException {
+ final KeyParts keyParts = new KeyParts(statement, instant);
+ for (final KeyParts k: keyParts) {
+ final Mutation m = new Mutation(k.getStoreKey());
m.putDelete(k.cf, k.cq);
writer.addMutation(m);
}
@@ -285,12 +289,12 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* @param interval
* @throws MutationsRejectedException
*/
- public void addInterval(BatchWriter writer, TemporalInterval interval, Statement statement) throws MutationsRejectedException {
+ public void addInterval(final BatchWriter writer, final TemporalInterval interval, final Statement statement) throws MutationsRejectedException {
- Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement)));
- Text cf = new Text(StatementSerializer.writeContext(statement));
- Text cqBegin = new Text(KeyParts.CQ_BEGIN);
- Text cqEnd = new Text(KeyParts.CQ_END);
+ final Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement)));
+ final Text cf = new Text(StatementSerializer.writeContext(statement));
+ final Text cqBegin = new Text(KeyParts.CQ_BEGIN);
+ final Text cqEnd = new Text(KeyParts.CQ_END);
// Start Begin index
Text keyText = new Text(interval.getAsKeyBeginning());
@@ -321,10 +325,10 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* @param instant
* @throws MutationsRejectedException
*/
- public void addInstant(BatchWriter writer, TemporalInstant instant, Statement statement) throws MutationsRejectedException {
- KeyParts keyParts = new KeyParts(statement, instant);
- for (KeyParts k : keyParts) {
- Mutation m = new Mutation(k.getStoreKey());
+ public void addInstant(final BatchWriter writer, final TemporalInstant instant, final Statement statement) throws MutationsRejectedException {
+ final KeyParts keyParts = new KeyParts(statement, instant);
+ for (final KeyParts k : keyParts) {
+ final Mutation m = new Mutation(k.getStoreKey());
m.put(k.cf, k.cq,k.getValue());
writer.addMutation(m);
}
@@ -339,16 +343,16 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* @throws IOException
*/
private Scanner getScanner() throws QueryEvaluationException {
- String whileDoing = "While creating a scanner for a temporal query. table name=" + temporalIndexTableName;
+ final String whileDoing = "While creating a scanner for a temporal query. table name=" + temporalIndexTableName;
Scanner scanner = null;
try {
scanner = ConfigUtils.createScanner(temporalIndexTableName, conf);
- } catch (AccumuloException e) {
+ } catch (final AccumuloException e) {
logger.error(whileDoing, e);
throw new QueryEvaluationException(whileDoing, e);
- } catch (AccumuloSecurityException e) {
+ } catch (final AccumuloSecurityException e) {
throw new QueryEvaluationException(whileDoing, e);
- } catch (TableNotFoundException e) {
+ } catch (final TableNotFoundException e) {
logger.error(whileDoing, e);
throw new QueryEvaluationException(whileDoing
+ " The temporal index table should have been created by this constructor, if found missing.", e);
@@ -356,21 +360,21 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
return scanner;
}
- private BatchScanner getBatchScanner() throws QueryEvaluationException {
- String whileDoing = "While creating a Batch scanner for a temporal query. table name=" + temporalIndexTableName;
- try {
- return ConfigUtils.createBatchScanner(temporalIndexTableName, conf);
- } catch (AccumuloException e) {
- logger.error(whileDoing, e);
- throw new QueryEvaluationException(whileDoing, e);
- } catch (AccumuloSecurityException e) {
- throw new QueryEvaluationException(whileDoing, e);
- } catch (TableNotFoundException e) {
- logger.error(whileDoing, e);
- throw new QueryEvaluationException(whileDoing
- + " The temporal index table should have been created by this constructor, if found missing. ", e);
- }
- }
+ private BatchScanner getBatchScanner() throws QueryEvaluationException {
+ final String whileDoing = "While creating a Batch scanner for a temporal query. table name=" + temporalIndexTableName;
+ try {
+ return ConfigUtils.createBatchScanner(temporalIndexTableName, conf);
+ } catch (final AccumuloException e) {
+ logger.error(whileDoing, e);
+ throw new QueryEvaluationException(whileDoing, e);
+ } catch (final AccumuloSecurityException e) {
+ throw new QueryEvaluationException(whileDoing, e);
+ } catch (final TableNotFoundException e) {
+ logger.error(whileDoing, e);
+ throw new QueryEvaluationException(whileDoing
+ + " The temporal index table should have been created by this constructor, if found missing. ", e);
+ }
+ }
/**
@@ -378,18 +382,18 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
*/
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantEqualsInstant(
- TemporalInstant queryInstant, StatementContraints constraints)
+ final TemporalInstant queryInstant, final StatementConstraints constraints)
throws QueryEvaluationException {
// get rows where the repository time is equal to the given time in queryInstant.
- Query query = new Query() {
- @Override
- public Range getRange(KeyParts keyParts) {
- //System.out.println("Scanning queryInstantEqualsInstant: prefix:" + KeyParts.toHumanString(keyParts.getQueryKey()));
- return Range.prefix(keyParts.getQueryKey()); // <-- specific logic
- }
- };
- ScannerBase scanner = query.doQuery(queryInstant, constraints);
- // TODO currently context constraints are filtered on the client.
+ final Query query = new Query() {
+ @Override
+ public Range getRange(final KeyParts keyParts) {
+ //System.out.println("Scanning queryInstantEqualsInstant: prefix:" + KeyParts.toHumanString(keyParts.getQueryKey()));
+ return Range.prefix(keyParts.getQueryKey()); // <-- specific logic
+ }
+ };
+ final ScannerBase scanner = query.doQuery(queryInstant, constraints);
+ // TODO currently context constraints are filtered on the client.
return getContextIteratorWrapper(scanner, constraints.getContext());
}
@@ -398,24 +402,25 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
*/
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInstant(
- TemporalInstant queryInstant, StatementContraints constraints)
+ final TemporalInstant queryInstant, final StatementConstraints constraints)
throws QueryEvaluationException {
// get rows where the repository time is before the given time.
- Query query = new Query() {
- @Override
- public Range getRange(KeyParts keyParts) {
- Text start= null;
- if (keyParts.constraintPrefix != null ) // Yes, has constraints
- start = keyParts.constraintPrefix; // <-- start specific logic
- else
- start = new Text(KeyParts.HASH_PREFIX_FOLLOWING);
- Text endAt = keyParts.getQueryKey(); // <-- end specific logic
- //System.out.println("Scanning queryInstantBeforeInstant: from:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt));
- return new Range(start, true, endAt, false);
- }
- };
- ScannerBase scanner = query.doQuery(queryInstant, constraints);
- return getContextIteratorWrapper(scanner, constraints.getContext());
+ final Query query = new Query() {
+ @Override
+ public Range getRange(final KeyParts keyParts) {
+ Text start= null;
+ if (keyParts.constraintPrefix != null ) {
+ start = keyParts.constraintPrefix; // <-- start specific logic
+ } else {
+ start = new Text(KeyParts.HASH_PREFIX_FOLLOWING);
+ }
+ final Text endAt = keyParts.getQueryKey(); // <-- end specific logic
+ //System.out.println("Scanning queryInstantBeforeInstant: from:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt));
+ return new Range(start, true, endAt, false);
+ }
+ };
+ final ScannerBase scanner = query.doQuery(queryInstant, constraints);
+ return getContextIteratorWrapper(scanner, constraints.getContext());
}
/**
@@ -423,29 +428,30 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
*/
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInstant(
- TemporalInstant queryInstant, StatementContraints constraints)
+ final TemporalInstant queryInstant, final StatementConstraints constraints)
throws QueryEvaluationException {
- Query query = new Query() {
- @Override
- public Range getRange(KeyParts keyParts) {
- Text start = Range.followingPrefix(keyParts.getQueryKey()); // <-- specific logic
- Text endAt = null; // no constraints // <-- specific logic
- if (keyParts.constraintPrefix != null ) // Yes, has constraints
- endAt = Range.followingPrefix(keyParts.constraintPrefix);
- //System.out.println("Scanning queryInstantAfterInstant from after:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt));
- return new Range(start, true, endAt, false);
- }
- };
- ScannerBase scanner = query.doQuery(queryInstant, constraints);
- return getContextIteratorWrapper(scanner, constraints.getContext());
- }
-
- /**
+ final Query query = new Query() {
+ @Override
+ public Range getRange(final KeyParts keyParts) {
+ final Text start = Range.followingPrefix(keyParts.getQueryKey()); // <-- specific logic
+ Text endAt = null; // no constraints // <-- specific logic
+ if (keyParts.constraintPrefix != null ) {
+ endAt = Range.followingPrefix(keyParts.constraintPrefix);
+ }
+ //System.out.println("Scanning queryInstantAfterInstant from after:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt));
+ return new Range(start, true, endAt, false);
+ }
+ };
+ final ScannerBase scanner = query.doQuery(queryInstant, constraints);
+ return getContextIteratorWrapper(scanner, constraints.getContext());
+ }
+
+ /**
* Get instances before a given interval. Returns queryInstantBeforeInstant with the interval's beginning time.
*/
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInterval(
- TemporalInterval givenInterval, StatementContraints contraints)
+ final TemporalInterval givenInterval, final StatementConstraints contraints)
throws QueryEvaluationException {
return queryInstantBeforeInstant(givenInterval.getHasBeginning(), contraints);
}
@@ -455,7 +461,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
*/
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInterval(
- TemporalInterval givenInterval, StatementContraints contraints) throws QueryEvaluationException {
+ final TemporalInterval givenInterval, final StatementConstraints contraints) throws QueryEvaluationException {
return queryInstantAfterInstant(givenInterval.getHasEnd(), contraints);
}
@@ -465,30 +471,30 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* exclusive (don't match the beginning and ending).
*/
@Override
- public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval(
- TemporalInterval queryInterval, StatementContraints constraints)
- throws QueryEvaluationException {
- // get rows where the time is after the given interval's beginning time and before the ending time.
- final TemporalInterval theQueryInterval = queryInterval;
- Query query = new Query() {
- private final TemporalInterval queryInterval = theQueryInterval;
- @Override
- public Range getRange(KeyParts keyParts) {
- Text start = Range.followingPrefix(new Text(keyParts.getQueryKey(queryInterval.getHasBeginning())));
- Text endAt = new Text(keyParts.getQueryKey(queryInterval.getHasEnd())); // <-- end specific logic
- //System.out.println("Scanning queryInstantInsideInterval: from excluding:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt));
- return new Range(start, false, endAt, false);
- }
- };
- ScannerBase scanner = query.doQuery(queryInterval.getHasBeginning(), constraints);
- return getContextIteratorWrapper(scanner, constraints.getContext());
- }
+ public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval(
+ final TemporalInterval queryInterval, final StatementConstraints constraints)
+ throws QueryEvaluationException {
+ // get rows where the time is after the given interval's beginning time and before the ending time.
+ final TemporalInterval theQueryInterval = queryInterval;
+ final Query query = new Query() {
+ private final TemporalInterval queryInterval = theQueryInterval;
+ @Override
+ public Range getRange(final KeyParts keyParts) {
+ final Text start = Range.followingPrefix(new Text(keyParts.getQueryKey(queryInterval.getHasBeginning())));
+ final Text endAt = new Text(keyParts.getQueryKey(queryInterval.getHasEnd())); // <-- end specific logic
+ //System.out.println("Scanning queryInstantInsideInterval: from excluding:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt));
+ return new Range(start, false, endAt, false);
+ }
+ };
+ final ScannerBase scanner = query.doQuery(queryInterval.getHasBeginning(), constraints);
+ return getContextIteratorWrapper(scanner, constraints.getContext());
+ }
/**
* Get instances matching the beginning of a given interval.
*/
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasBeginningInterval(
- TemporalInterval queryInterval, StatementContraints contraints)
+ final TemporalInterval queryInterval, final StatementConstraints contraints)
throws QueryEvaluationException {
return queryInstantEqualsInstant(queryInterval.getHasBeginning(), contraints);
}
@@ -498,7 +504,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
*/
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasEndInterval(
- TemporalInterval queryInterval, StatementContraints contraints)
+ final TemporalInterval queryInterval, final StatementConstraints contraints)
throws QueryEvaluationException {
return queryInstantEqualsInstant(queryInterval.getHasEnd(), contraints);
}
@@ -509,116 +515,121 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* Currently predicate and subject constraints are filtered on the client.
*/
@Override
- public CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals(
- TemporalInterval query, StatementContraints contraints)
- throws QueryEvaluationException {
- Scanner scanner = getScanner();
- if (scanner != null) {
- // get rows where the start and end match.
- Range range = Range.prefix(new Text(query.getAsKeyBeginning()));
- scanner.setRange(range);
- if (contraints.hasContext())
- scanner.fetchColumn(new Text(contraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN));
- else
- scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN));
- }
- // Iterator<Entry<Key, Value>> iter = scanner.iterator();
- // while (iter.hasNext()) {
- // System.out.println("queryIntervalEquals results:"+iter.next());
- // }
- //return getConstrainedIteratorWrapper(scanner, contraints);
- return getIteratorWrapper(scanner);
- }
-
- /**
- * find intervals stored in the repository before the given Interval. Find interval endings that are
- * before the given beginning.
+ public CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals(
+ final TemporalInterval query, final StatementConstraints contraints)
+ throws QueryEvaluationException {
+ final Scanner scanner = getScanner();
+ if (scanner != null) {
+ // get rows where the start and end match.
+ final Range range = Range.prefix(new Text(query.getAsKeyBeginning()));
+ scanner.setRange(range);
+ if (contraints.hasContext()) {
+ scanner.fetchColumn(new Text(contraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN));
+ } else {
+ scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN));
+ }
+ }
+ // Iterator<Entry<Key, Value>> iter = scanner.iterator();
+ // while (iter.hasNext()) {
+ // System.out.println("queryIntervalEquals results:"+iter.next());
+ // }
+ //return getConstrainedIteratorWrapper(scanner, contraints);
+ return getIteratorWrapper(scanner);
+ }
+
+ /**
+ * find intervals stored in the repository before the given Interval. Find interval endings that are
+ * before the given beginning.
* Indexing Intervals will probably change or be removed.
* Currently predicate and subject constraints are filtered on the client.
- */
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore(
- TemporalInterval queryInterval, StatementContraints constraints) throws QueryEvaluationException
- {
- Scanner scanner = getScanner();
- if (scanner != null) {
- // get rows where the end date is less than the queryInterval.getBefore()
- Range range = new Range(null, false, new Key(new Text(queryInterval.getHasBeginning().getAsKeyBytes())), false);
- scanner.setRange(range);
- if (constraints.hasContext())
- scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_END));
- else
- scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_END));
- }
- return getIteratorWrapper(scanner);
- }
-
- /**
- * Interval after given interval. Find intervals that begin after the endings of the given interval.
- * Use the special following prefix mechanism to avoid matching the beginning date.
+ */
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore(
+ final TemporalInterval queryInterval, final StatementConstraints constraints) throws QueryEvaluationException
+ {
+ final Scanner scanner = getScanner();
+ if (scanner != null) {
+ // get rows where the end date is less than the queryInterval.getBefore()
+ final Range range = new Range(null, false, new Key(new Text(queryInterval.getHasBeginning().getAsKeyBytes())), false);
+ scanner.setRange(range);
+ if (constraints.hasContext()) {
+ scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_END));
+ } else {
+ scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_END));
+ }
+ }
+ return getIteratorWrapper(scanner);
+ }
+
+ /**
+ * Interval after given interval. Find intervals that begin after the endings of the given interval.
+ * Use the special following prefix mechanism to avoid matching the beginning date.
* Indexing Intervals will probably change or be removed.
* Currently predicate and subject and context constraints are filtered on the client.
- */
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter(
- TemporalInterval queryInterval, StatementContraints constraints)
- throws QueryEvaluationException {
-
- Scanner scanner = getScanner();
- if (scanner != null) {
- // get rows where the start date is greater than the queryInterval.getEnd()
- Range range = new Range(new Key(Range.followingPrefix(new Text(queryInterval.getHasEnd().getAsKeyBytes()))), false, null, true);
- scanner.setRange(range);
-
- if (constraints.hasContext())
- scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN));
- else
- scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN));
- }
- // TODO currently predicate, subject and context constraints are filtered on the clients
- return getIteratorWrapper(scanner);
- }
- // --
- // -- END of Query functions. Next up, general stuff used by the queries above.
- // --
-
- /**
- * Allows passing range specific logic into doQuery.
- * Each query function implements an anonymous instance of this and calls it's doQuery().
- */
- abstract class Query {
- abstract protected Range getRange(KeyParts keyParts);
-
- public ScannerBase doQuery(TemporalInstant queryInstant, StatementContraints constraints) throws QueryEvaluationException {
- // key is contraintPrefix + time, or just time.
- // Any constraints handled here, if the constraints are empty, the
- // thisKeyParts.contraintPrefix will be null.
- List<KeyParts> keyParts = KeyParts.keyPartsForQuery(queryInstant, constraints);
- ScannerBase scanner = null;
- if (keyParts.size() > 1)
- scanner = getBatchScanner();
- else
- scanner = getScanner();
-
- Collection<Range> ranges = new HashSet<Range>();
- KeyParts lastKeyParts = null;
- Range range = null;
- for (KeyParts thisKeyParts : keyParts) {
- range = this.getRange(thisKeyParts);
- ranges.add(range);
- lastKeyParts = thisKeyParts;
- }
- //System.out.println("Scanning columns, cf:" + lastKeyParts.cf + "CQ:" + lastKeyParts.cq);
- scanner.fetchColumn(new Text(lastKeyParts.cf), new Text(lastKeyParts.cq));
- if (scanner instanceof BatchScanner)
- ((BatchScanner) scanner).setRanges(ranges);
- else if (range != null)
- ((Scanner) scanner).setRange(range);
- return scanner;
- }
- }
-
- /**
+ */
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter(
+ final TemporalInterval queryInterval, final StatementConstraints constraints)
+ throws QueryEvaluationException {
+
+ final Scanner scanner = getScanner();
+ if (scanner != null) {
+ // get rows where the start date is greater than the queryInterval.getEnd()
+ final Range range = new Range(new Key(Range.followingPrefix(new Text(queryInterval.getHasEnd().getAsKeyBytes()))), false, null, true);
+ scanner.setRange(range);
+
+ if (constraints.hasContext()) {
+ scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN));
+ } else {
+ scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN));
+ }
+ }
+ // TODO currently predicate, subject and context constraints are filtered on the clients
+ return getIteratorWrapper(scanner);
+ }
+ // --
+ // -- END of Query functions. Next up, general stuff used by the queries above.
+ // --
+
+ /**
+ * Allows passing range specific logic into doQuery.
+ * Each query function implements an anonymous instance of this and calls it's doQuery().
+ */
+ abstract class Query {
+ abstract protected Range getRange(KeyParts keyParts);
+
+ public ScannerBase doQuery(final TemporalInstant queryInstant, final StatementConstraints constraints) throws QueryEvaluationException {
+ // key is contraintPrefix + time, or just time.
+ // Any constraints handled here, if the constraints are empty, the
+ // thisKeyParts.contraintPrefix will be null.
+ final List<KeyParts> keyParts = KeyParts.keyPartsForQuery(queryInstant, constraints);
+ ScannerBase scanner = null;
+ if (keyParts.size() > 1) {
+ scanner = getBatchScanner();
+ } else {
+ scanner = getScanner();
+ }
+
+ final Collection<Range> ranges = new HashSet<Range>();
+ KeyParts lastKeyParts = null;
+ Range range = null;
+ for (final KeyParts thisKeyParts : keyParts) {
+ range = getRange(thisKeyParts);
+ ranges.add(range);
+ lastKeyParts = thisKeyParts;
+ }
+ //System.out.println("Scanning columns, cf:" + lastKeyParts.cf + "CQ:" + lastKeyParts.cq);
+ scanner.fetchColumn(new Text(lastKeyParts.cf), new Text(lastKeyParts.cq));
+ if (scanner instanceof BatchScanner) {
+ ((BatchScanner) scanner).setRanges(ranges);
+ } else if (range != null) {
+ ((Scanner) scanner).setRange(range);
+ }
+ return scanner;
+ }
+ }
+
+ /**
* An iteration wrapper for a loaded scanner that is returned for each query above.
*
* @param scanner
@@ -637,16 +648,16 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
@Override
public Statement next() throws QueryEvaluationException {
- Entry<Key, Value> entry = i.next();
- Value v = entry.getValue();
+ final Entry<Key, Value> entry = i.next();
+ final Value v = entry.getValue();
try {
- String dataString = Text.decode(v.get(), 0, v.getSize());
- Statement s = StatementSerializer.readStatement(dataString);
+ final String dataString = Text.decode(v.get(), 0, v.getSize());
+ final Statement s = StatementSerializer.readStatement(dataString);
return s;
- } catch (CharacterCodingException e) {
+ } catch (final CharacterCodingException e) {
logger.error("Error decoding value=" + Arrays.toString(v.get()), e);
throw new QueryEvaluationException(e);
- } catch (IOException e) {
+ } catch (final IOException e) {
logger.error("Error de-serializing statement, string=" + v.get(), e);
throw new QueryEvaluationException(e);
}
@@ -673,16 +684,17 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* @return an anonymous object that will iterate the resulting statements from a given scanner.
* @throws QueryEvaluationException
*/
- private static CloseableIteration<Statement, QueryEvaluationException> getConstrainedIteratorWrapper(final Scanner scanner, final StatementContraints constraints) {
- if (!constraints.hasContext() && !constraints.hasSubject() && !constraints.hasPredicates())
- return getIteratorWrapper(scanner);
- return new ConstrainedIteratorWrapper(scanner) {
- @Override
- public boolean allowedBy(Statement statement) {
- return allowedByConstraints(statement, constraints);
- }
- };
- }
+ private static CloseableIteration<Statement, QueryEvaluationException> getConstrainedIteratorWrapper(final Scanner scanner, final StatementConstraints constraints) {
+ if (!constraints.hasContext() && !constraints.hasSubject() && !constraints.hasPredicates()) {
+ return getIteratorWrapper(scanner);
+ }
+ return new ConstrainedIteratorWrapper(scanner) {
+ @Override
+ public boolean allowedBy(final Statement statement) {
+ return allowedByConstraints(statement, constraints);
+ }
+ };
+ }
/**
* An iteration wrapper for a loaded scanner that is returned for queries above.
* Currently, this temporal index supports contexts only on the client, using this filter.
@@ -692,82 +704,86 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* @return an anonymous object that will iterate the resulting statements from a given scanner.
* @throws QueryEvaluationException
*/
- private static CloseableIteration<Statement, QueryEvaluationException> getContextIteratorWrapper(final ScannerBase scanner, final Resource context) {
- if (context==null)
- return getIteratorWrapper(scanner);
- return new ConstrainedIteratorWrapper(scanner) {
- @Override
- public boolean allowedBy(Statement statement) {
- return allowedByContext(statement, context);
- }
- };
- }
- /**
- * Wrap a scanner in a iterator that will filter statements based on a boolean allowedBy().
- * If the allowedBy function returns false for the next statement, it is skipped.
- * This is used for to do client side, what the index cannot (yet) do on the server side.
- */
+ private static CloseableIteration<Statement, QueryEvaluationException> getContextIteratorWrapper(final ScannerBase scanner, final Resource context) {
+ if (context==null) {
+ return getIteratorWrapper(scanner);
+ }
+ return new ConstrainedIteratorWrapper(scanner) {
+ @Override
+ public boolean allowedBy(final Statement statement) {
+ return allowedByContext(statement, context);
+ }
+ };
+ }
+ /**
+ * Wrap a scanner in a iterator that will filter statements based on a boolean allowedBy().
+ * If the allowedBy function returns false for the next statement, it is skipped.
+ * This is used for to do client side, what the index cannot (yet) do on the server side.
+ */
abstract static class ConstrainedIteratorWrapper implements CloseableIteration<Statement, QueryEvaluationException> {
- private Statement nextStatement=null;
- private boolean isInitialized = false;
- final private Iterator<Entry<Key, Value>> i;
- final private ScannerBase scanner;
-
- ConstrainedIteratorWrapper(ScannerBase scanner) {
- this.scanner = scanner;
- i=scanner.iterator();
- }
+ private Statement nextStatement=null;
+ private boolean isInitialized = false;
+ final private Iterator<Entry<Key, Value>> i;
+ final private ScannerBase scanner;
+
+ ConstrainedIteratorWrapper(final ScannerBase scanner) {
+ this.scanner = scanner;
+ i=scanner.iterator();
+ }
@Override
public boolean hasNext() throws QueryEvaluationException {
- if (!isInitialized)
- internalGetNext();
- return (nextStatement != null) ;
+ if (!isInitialized) {
+ internalGetNext();
+ }
+ return (nextStatement != null) ;
}
@Override
public Statement next() throws QueryEvaluationException {
- if (nextStatement==null) {
- if (!isInitialized)
- internalGetNext();
- if (nextStatement==null)
- throw new NoSuchElementException();
- }
- // use this one, then get the next one loaded.
- Statement thisStatement = this.nextStatement;
- internalGetNext();
- return thisStatement;
+ if (nextStatement==null) {
+ if (!isInitialized) {
+ internalGetNext();
+ }
+ if (nextStatement==null) {
+ throw new NoSuchElementException();
+ }
+ }
+ // use this one, then get the next one loaded.
+ final Statement thisStatement = nextStatement;
+ internalGetNext();
+ return thisStatement;
}
- /**
- * Gets the next statement meeting constraints and stores in nextStatement.
- * Sets null when all done, or on exception.
- * @throws QueryEvaluationException
- */
- private void internalGetNext()
- throws QueryEvaluationException {
- isInitialized=true;
- this.nextStatement = null; // Default on done or error.
- Statement statement = null;
- while (i.hasNext()) {
- Entry<Key, Value> entry = i.next();
- Value v = entry.getValue();
- try {
- String dataString = Text.decode(v.get(), 0, v.getSize());
- statement = StatementSerializer.readStatement(dataString);
- } catch (CharacterCodingException e) {
- logger.error("Error decoding value=" + Arrays.toString(v.get()), e);
- throw new QueryEvaluationException(e);
- } catch (IOException e) {
- logger.error("Error de-serializing statement, string=" + v.get(), e);
- throw new QueryEvaluationException(e);
- }
- if (allowedBy(statement)) {
- this.nextStatement = statement;
- return;
- }
- }
- }
- public abstract boolean allowedBy(Statement s);
+ /**
+ * Gets the next statement meeting constraints and stores in nextStatement.
+ * Sets null when all done, or on exception.
+ * @throws QueryEvaluationException
+ */
+ private void internalGetNext()
+ throws QueryEvaluationException {
+ isInitialized=true;
+ nextStatement = null; // Default on done or error.
+ Statement statement = null;
+ while (i.hasNext()) {
+ final Entry<Key, Value> entry = i.next();
+ final Value v = entry.getValue();
+ try {
+ final String dataString = Text.decode(v.get(), 0, v.getSize());
+ statement = StatementSerializer.readStatement(dataString);
+ } catch (final CharacterCodingException e) {
+ logger.error("Error decoding value=" + Arrays.toString(v.get()), e);
+ throw new QueryEvaluationException(e);
+ } catch (final IOException e) {
+ logger.error("Error de-serializing statement, string=" + v.get(), e);
+ throw new QueryEvaluationException(e);
+ }
+ if (allowedBy(statement)) {
+ nextStatement = statement;
+ return;
+ }
+ }
+ }
+ public abstract boolean allowedBy(Statement s);
@Override
public void remove() {
@@ -786,35 +802,39 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* @param contraints fields that are non-null must match the statement's components, otherwise it is not allowed.
* @return true if the parts of the statement match the statementConstraints' parts.
*/
- protected static boolean allowedByConstraints(Statement statement, StatementContraints constraints) {
-
- if (constraints.hasSubject() && ! constraints.getSubject().toString().equals(statement.getSubject().toString()))
- {System.out.println("Constrain subject: "+constraints.getSubject()+" != " + statement.getSubject()); return false;}
- //return false;
-
- if (! allowedByContext(statement, constraints.getContext()))
- return false;
- //{System.out.println("Constrain context: "+constraints.getContext()+" != " + statement.getContext()); return false;}
-
- if (constraints.hasPredicates() && ! constraints.getPredicates().contains(statement.getPredicate()))
- return false;
- //{System.out.println("Constrain predicate: "+constraints.getPredicates()+" != " + statement.getPredicate()); return false;}
-
- System.out.println("allow statement: "+ statement.toString());
- return true;
- }
-
- /**
- * Allow only if the context matches the statement. This is a client side filter.
- * @param statement
- * @param context
- * @return
- */
- protected static boolean allowedByContext(Statement statement, Resource context) {
- return context==null || context.equals( statement.getContext() );
- }
-
- @Override
+ protected static boolean allowedByConstraints(final Statement statement, final StatementConstraints constraints) {
+
+ if (constraints.hasSubject() && ! constraints.getSubject().toString().equals(statement.getSubject().toString()))
+ {System.out.println("Constrain subject: "+constraints.getSubject()+" != " + statement.getSubject()); return false;}
+ //return false;
+
+ if (! allowedByContext(statement, constraints.getContext()))
+ {
+ return false;
+ //{System.out.println("Constrain context: "+constraints.getContext()+" != " + statement.getContext()); return false;}
+ }
+
+ if (constraints.hasPredicates() && ! constraints.getPredicates().contains(statement.getPredicate()))
+ {
+ return false;
+ //{System.out.println("Constrain predicate: "+constraints.getPredicates()+" != " + statement.getPredicate()); return false;}
+ }
+
+ System.out.println("allow statement: "+ statement.toString());
+ return true;
+ }
+
+ /**
+ * Allow only if the context matches the statement. This is a client side filter.
+ * @param statement
+ * @param context
+ * @return
+ */
+ protected static boolean allowedByContext(final Statement statement, final Resource context) {
+ return context==null || context.equals( statement.getContext() );
+ }
+
+ @Override
public Set<URI> getIndexablePredicates() {
return validPredicates;
@@ -829,8 +849,8 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
public void flush() throws IOException {
try {
mtbw.flush();
- } catch (MutationsRejectedException e) {
- String msg = "Error while flushing the batch writer.";
+ } catch (final MutationsRejectedException e) {
+ final String msg = "Error while flushing the batch writer.";
logger.error(msg, e);
throw new IOException(msg, e);
}
@@ -847,8 +867,8 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
mtbw.close();
- } catch (MutationsRejectedException e) {
- String msg = "Error while closing the batch writer.";
+ } catch (final MutationsRejectedException e) {
+ final String msg = "Error while closing the batch writer.";
logger.error(msg, e);
throw new IOException(msg, e);
}
@@ -861,13 +881,14 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
return ConfigUtils.getTemporalTableName(conf);
}
- private void deleteStatement(Statement statement) throws IOException, IllegalArgumentException {
+ private void deleteStatement(final Statement statement) throws IOException, IllegalArgumentException {
// if the predicate list is empty, accept all predicates.
// Otherwise, make sure the predicate is on the "valid" list
- boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
- if (!isValidPredicate || !(statement.getObject() instanceof Literal))
+ final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
+ if (!isValidPredicate || !(statement.getObject() instanceof Literal)) {
return;
- DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval
+ }
+ final DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval
extractDateTime(statement, indexDateTimes);
if (indexDateTimes[0] == null) {
return;
@@ -876,49 +897,49 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
// Remove this as an instant, or interval.
try {
if (indexDateTimes[1] != null) {
- TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1]));
+ final TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1]));
removeInterval(temporalIndexBatchWriter, interval, statement);
} else {
- TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]);
+ final TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]);
removeInstant(temporalIndexBatchWriter, instant, statement);
}
- } catch (MutationsRejectedException e) {
+ } catch (final MutationsRejectedException e) {
throw new IOException("While adding interval/instant for statement =" + statement, e);
}
}
@Override
- public void deleteStatement(RyaStatement statement) throws IllegalArgumentException, IOException {
+ public void deleteStatement(final RyaStatement statement) throws IllegalArgumentException, IOException {
deleteStatement(RyaToRdfConversions.convertStatement(statement));
}
- @Override
- public void init() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void setConnector(Connector connector) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void destroy() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void purge(RdfCloudTripleStoreConfiguration configuration) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void dropAndDestroy() {
- // TODO Auto-generated method stub
-
- }
+ @Override
+ public void init() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setConnector(final Connector connector) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void destroy() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void dropAndDestroy() {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java
deleted file mode 100644
index a69a79f..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- *
- */
-package mvm.rya.indexing.accumulo.temporal;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import mvm.rya.indexing.TemporalInstant;
-import mvm.rya.indexing.TemporalInterval;
-
-import org.apache.commons.codec.binary.StringUtils;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-/**
- * Immutable date and time instance returning a human readable key.
- * Preserves the Time zone, but not stored in the key.
- * Converts fields (hours, etc) correctly for tz=Zulu when stored,
- * so the original timezone is not preserved when retrieved.
- *
- * Uses rfc 3339, which looks like: YYYY-MM-DDThh:mm:ssZ a subset
- * of ISO-8601 : https://www.ietf.org/rfc/rfc3339.txt
- *
- * Limits: All dates and times are assumed to be in the "current era", no BC,
- * somewhere between 0000AD and 9999AD.
- *
- * Resolution: to the second, or millisecond if the optional fraction is used.
- *
- * This is really a wrapper for Joda DateTime. if you need functionality from
- * that wonderful class, simply use t.getAsDateTime().
- *
- */
-public class TemporalInstantRfc3339 implements TemporalInstant {
-
- private static final long serialVersionUID = -7790000399142290309L;
-
- private final DateTime dateTime;
- /**
- * Format key like this: YYYY-MM-DDThh:mm:ssZ
- */
- public final static DateTimeFormatter FORMATTER = ISODateTimeFormat.dateTimeNoMillis();
-
- /**
- * New date assumed UTC time zone.
- *
- * @param year
- * @param month
- * @param day
- * @param hour
- * @param minute
- * @param second
- */
- public TemporalInstantRfc3339(int year, int month, int day, int hour, int minute, int second) {
- dateTime = new DateTime(year, month, day, hour, minute, second, DateTimeZone.UTC);
- }
-
- /**
- * Construct with a Joda/java v8 DateTime;
- * TZ is preserved, but not in the key.
- *
- * @param dateTime
- * initialize with this date time. Converted to zulu time zone for key generation.
- * @return
- */
- public TemporalInstantRfc3339(DateTime datetime) {
- this.dateTime = datetime;
- }
- /**
- * Get an interval setting beginning and end with this implementation of {@link TemporalInstant}.
- * beginning must be less than end.
- *
- * @param dateTimeInterval String in the form [dateTime1,dateTime2]
- */
- public static TemporalInterval parseInterval(String dateTimeInterval) {
-
- Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(dateTimeInterval);
- if (matcher.find()) {
- // Got a date time pair, parse into an interval.
- return new TemporalInterval(
- new TemporalInstantRfc3339(new DateTime(matcher.group(1))),
- new TemporalInstantRfc3339(new DateTime(matcher.group(2))));
- }
- throw new IllegalArgumentException("Can't parse interval, expecting '[ISO8601dateTime1,ISO8601dateTime2]', actual: "+dateTimeInterval);
- }
-
- /**
- * if this is older returns -1, equal 0, else 1
- *
- */
- @Override
- public int compareTo(TemporalInstant that) {
- return this.getAsKeyString().compareTo(that.getAsKeyString());
- }
-
- @Override
- public byte[] getAsKeyBytes() {
- return StringUtils.getBytesUtf8(getAsKeyString());
- }
-
- @Override
- public String getAsKeyString() {
- return dateTime.withZone(DateTimeZone.UTC).toString(FORMATTER);
- }
-
- /**
- * Readable string, formated local time at {@link DateTimeZone}.
- * If the timezone is UTC (Z), it was probably a key from the database.
- * If the server and client are in different Time zone, should probably use the client timezone.
- *
- * Time at specified time zone:
- * instant.getAsReadable(DateTimeZone.forID("-05:00")));
- * instant.getAsReadable(DateTimeZone.getDefault()));
- *
- * Use original time zone set in the constructor:
- * instant.getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER));
- *
- */
- @Override
- public String getAsReadable(DateTimeZone dateTimeZone) {
- return dateTime.withZone(dateTimeZone).toString(FORMATTER);
- }
-
- /**
- * Use original time zone set in the constructor, or UTC if from parsing the key.
- */
- @Override
- public String getAsReadable() {
- return dateTime.toString(FORMATTER);
- }
-
- /**
- * default toString, same as getAsReadable().
- */
- @Override
- public String toString() {
- return getAsReadable();
- }
-
- /**
- * Show readable time converted to the default timezone.
- */
- @Override
- public DateTime getAsDateTime() {
- return dateTime;
- }
-
- /**
- * Minimum Date, used for infinitely past.
- */
- private static final TemporalInstant MINIMUM = new TemporalInstantRfc3339(new DateTime(Long.MIN_VALUE));
- /**
- * maximum date/time is used for infinitely in the future.
- */
- private static final TemporalInstant MAXIMUM = new TemporalInstantRfc3339(new DateTime(Long.MAX_VALUE));
-
- /**
- * infinite past date.
- * @return an instant that will compare as NEWER than anything but itself.
- */
- public static TemporalInstant getMinimumInstance() {
- return MINIMUM;
- }
- /**
- * infinite future date.
- * @return an instant that will compare as OLDER than anything but itself
- */
-
- public static TemporalInstant getMaximumInstance() {
- return MAXIMUM;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode() {
- return this.getAsKeyString().hashCode();
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TemporalInstantRfc3339 other = (TemporalInstantRfc3339) obj;
- return (this.getAsKeyString().equals(other.getAsKeyString()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java
deleted file mode 100644
index f2ed8c4..0000000
--- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java
+++ /dev/null
@@ -1,320 +0,0 @@
-package mvm.rya.indexing.accumulo.temporal;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.util.Map;
-import java.util.Set;
-
-import mvm.rya.indexing.IndexingExpr;
-import mvm.rya.indexing.IteratorFactory;
-import mvm.rya.indexing.SearchFunction;
-import mvm.rya.indexing.SearchFunctionFactory;
-import mvm.rya.indexing.StatementContraints;
-import mvm.rya.indexing.TemporalIndexer;
-import mvm.rya.indexing.TemporalInstant;
-import mvm.rya.indexing.TemporalInterval;
-import mvm.rya.indexing.accumulo.geo.GeoTupleSet;
-import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.joda.time.DateTime;
-import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.QueryModelVisitor;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-
-//Indexing Node for temporal expressions to be inserted into execution plan
-//to delegate temporal portion of query to temporal index
-public class TemporalTupleSet extends ExternalTupleSet {
-
- private Configuration conf;
- private TemporalIndexer temporalIndexer;
- private IndexingExpr filterInfo;
-
-
- public TemporalTupleSet(IndexingExpr filterInfo, TemporalIndexer temporalIndexer) {
- this.filterInfo = filterInfo;
- this.temporalIndexer = temporalIndexer;
- this.conf = temporalIndexer.getConf();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Set<String> getBindingNames() {
- return filterInfo.getBindingNames();
- }
-
- /**
- * {@inheritDoc}
- * <p>
- * Note that we need a deep copy for everything that (during optimizations)
- * can be altered via {@link #visitChildren(QueryModelVisitor)}
- */
- public TemporalTupleSet clone() {
- return new TemporalTupleSet(filterInfo, temporalIndexer);
- }
-
- @Override
- public double cardinality() {
- return 0.0; // No idea how the estimate cardinality here.
- }
-
-
- @Override
- public String getSignature() {
-
- return "(TemporalTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " ");
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof TemporalTupleSet)) {
- return false;
- }
- TemporalTupleSet arg = (TemporalTupleSet) other;
- return this.filterInfo.equals(arg.filterInfo);
- }
-
-
- @Override
- public int hashCode() {
- int result = 17;
- result = 31*result + filterInfo.hashCode();
-
- return result;
- }
-
-
- /**
- * Returns an iterator over the result set associated with contained IndexingExpr.
- * <p>
- * Should be thread-safe (concurrent invocation {@link OfflineIterable} this
- * method can be expected with some query evaluators.
- */
- @Override
- public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings)
- throws QueryEvaluationException {
-
-
- URI funcURI = filterInfo.getFunction();
- SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI);
-
- if(filterInfo.getArguments().length > 1) {
- throw new IllegalArgumentException("Index functions do not support more than two arguments.");
- }
-
- String queryText = filterInfo.getArguments()[0].stringValue();
-
- return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction);
- }
-
-
- //returns appropriate search function for a given URI
- //search functions used by TemporalIndexer to query Temporal Index
- private class TemporalSearchFunctionFactory {
-
- private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
- Configuration conf;
-
- public TemporalSearchFunctionFactory(Configuration conf) {
- this.conf = conf;
- }
-
-
- /**
- * Get a {@link TemporalSearchFunction} for a give URI.
- *
- * @param searchFunction
- * @return
- */
- public SearchFunction getSearchFunction(final URI searchFunction) {
-
- SearchFunction geoFunc = null;
-
- try {
- geoFunc = getSearchFunctionInternal(searchFunction);
- } catch (QueryEvaluationException e) {
- e.printStackTrace();
- }
-
- return geoFunc;
- }
-
- private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException {
- SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction);
-
- if (sf != null) {
- return sf;
- } else {
- throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue());
- }
-
-
- }
-
-
-
- private final SearchFunction TEMPORAL_InstantAfterInstant = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms,
- StatementContraints contraints) throws QueryEvaluationException {
- TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms));
- return temporalIndexer.queryInstantAfterInstant(queryInstant, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantAfterInstant";
- };
- };
- private final SearchFunction TEMPORAL_InstantBeforeInstant = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms,
- StatementContraints contraints) throws QueryEvaluationException {
- TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms));
- return temporalIndexer.queryInstantBeforeInstant(queryInstant, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantBeforeInstant";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantEqualsInstant = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms,
- StatementContraints contraints) throws QueryEvaluationException {
- TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms));
- return temporalIndexer.queryInstantEqualsInstant(queryInstant, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantEqualsInstant";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantAfterInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms,
- StatementContraints contraints) throws QueryEvaluationException {
- TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantAfterInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantAfterInterval";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantBeforeInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms,
- StatementContraints contraints) throws QueryEvaluationException {
- TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantBeforeInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantBeforeInterval";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantInsideInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms,
- StatementContraints contraints) throws QueryEvaluationException {
- TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantInsideInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantInsideInterval";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantHasBeginningInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms,
- StatementContraints contraints) throws QueryEvaluationException {
- TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantHasBeginningInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantHasBeginningInterval";
- };
- };
-
- private final SearchFunction TEMPORAL_InstantHasEndInterval = new SearchFunction() {
- @Override
- public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms,
- StatementContraints contraints) throws QueryEvaluationException {
- TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms);
- return temporalIndexer.queryInstantHasEndInterval(queryInterval, contraints);
- }
-
- @Override
- public String toString() {
- return "TEMPORAL_InstantHasEndInterval";
- };
- };
-
- {
-
- String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#";
-
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"after"), TEMPORAL_InstantAfterInstant);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"before"), TEMPORAL_InstantBeforeInstant);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"equals"), TEMPORAL_InstantEqualsInstant);
-
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"beforeInterval"), TEMPORAL_InstantBeforeInterval);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"afterInterval"), TEMPORAL_InstantAfterInterval);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"insideInterval"), TEMPORAL_InstantInsideInterval);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasBeginningInterval"),
- TEMPORAL_InstantHasBeginningInterval);
- SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasEndInterval"), TEMPORAL_InstantHasEndInterval);
-
- }
-
-
-
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
index cce0a81..6aaf2c4 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import javax.annotation.ParametersAreNonnullByDefault;
@@ -34,6 +36,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater.PcjUpdateException;
+import org.openrdf.model.URI;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
@@ -114,7 +117,7 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
@Override
public Configuration getConf() {
- return this.conf.get();
+ return conf.get();
}
/**
@@ -252,4 +255,9 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer {
log.warn("PCJ indicies are not stored within a single table, so this method can not be implemented.");
return null;
}
+
+ @Override
+ public Set<URI> getIndexablePredicates() {
+ return new HashSet<>();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
index 8d7b180..ed202f4 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
@@ -20,6 +20,12 @@ package mvm.rya.indexing.external.fluo;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_INSTANCE;
+import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_PASSWORD;
+import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_USERNAME;
+import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS;
+import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.FLUO_APP_NAME;
+import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.STATEMENT_VISIBILITY;
import javax.annotation.ParametersAreNonnullByDefault;
@@ -67,13 +73,13 @@ public class FluoPcjUpdaterSupplier implements Supplier<FluoPcjUpdater> {
final FluoPcjUpdaterConfig fluoUpdaterConfig = new FluoPcjUpdaterConfig( indexerConfig.getConfig() );
// Make sure the required values are present.
- checkArgument(fluoUpdaterConfig.getFluoAppName().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.FLUO_APP_NAME);
- checkArgument(fluoUpdaterConfig.getFluoZookeepers().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS);
- checkArgument(fluoUpdaterConfig.getAccumuloZookeepers().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS);
- checkArgument(fluoUpdaterConfig.getAccumuloInstance().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_INSTANCE);
- checkArgument(fluoUpdaterConfig.getAccumuloUsername().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_USERNAME);
- checkArgument(fluoUpdaterConfig.getAccumuloPassword().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.ACCUMULO_PASSWORD);
- checkArgument(fluoUpdaterConfig.getStatementVisibility().isPresent(), "Missing configuration: " + FluoPcjUpdaterConfig.STATEMENT_VISIBILITY);
+ checkArgument(fluoUpdaterConfig.getFluoAppName().isPresent(), "Missing configuration: " + FLUO_APP_NAME);
+ checkArgument(fluoUpdaterConfig.getFluoZookeepers().isPresent(), "Missing configuration: " + ACCUMULO_ZOOKEEPERS);
+ checkArgument(fluoUpdaterConfig.getAccumuloZookeepers().isPresent(), "Missing configuration: " + ACCUMULO_ZOOKEEPERS);
+ checkArgument(fluoUpdaterConfig.getAccumuloInstance().isPresent(), "Missing configuration: " + ACCUMULO_INSTANCE);
+ checkArgument(fluoUpdaterConfig.getAccumuloUsername().isPresent(), "Missing configuration: " + ACCUMULO_USERNAME);
+ checkArgument(fluoUpdaterConfig.getAccumuloPassword().isPresent(), "Missing configuration: " + ACCUMULO_PASSWORD);
+ checkArgument(fluoUpdaterConfig.getStatementVisibility().isPresent(), "Missing configuration: " + STATEMENT_VISIBILITY);
// Fluo configuration values.
final FluoConfiguration fluoClientConfig = new FluoConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java
index 5f2e700..078cbd2 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java
@@ -1,5 +1,7 @@
package mvm.rya.indexing.mongodb;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -8,9 +10,9 @@ package mvm.rya.indexing.mongodb;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,49 +21,170 @@ package mvm.rya.indexing.mongodb;
* under the License.
*/
-
import java.io.IOException;
import java.util.Collection;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.QueryEvaluationException;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.QueryBuilder;
+import info.aduna.iteration.CloseableIteration;
import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.domain.RyaURI;
import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.indexing.StatementConstraints;
+import mvm.rya.mongodb.MongoDBRdfConfiguration;
-import org.apache.hadoop.conf.Configuration;
+/**
+ * Secondary Indexer using MondoDB
+ * @param <T> - The {@link AbstractMongoIndexingStorageStrategy} this indexer uses.
+ */
+public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrategy> implements RyaSecondaryIndexer {
+ private static final Logger LOG = Logger.getLogger(AbstractMongoIndexer.class);
+
+ private boolean isInit = false;
+ protected Configuration conf;
+ protected MongoClient mongoClient;
+ protected String dbName;
+ protected DB db;
+ protected DBCollection collection;
+ protected Set<URI> predicates;
+
+ protected T storageStrategy;
-public abstract class AbstractMongoIndexer implements RyaSecondaryIndexer {
+ /**
+ * Creates a new {@link AbstractMongoIndexer} with the provided mongo client.
+ * @param mongoClient The {@link MongoClient} to use with this indexer.
+ */
+ public AbstractMongoIndexer(final MongoClient mongoClient) {
+ this.mongoClient = checkNotNull(mongoClient);
+ }
+
+ protected void init() throws NumberFormatException, IOException{
+ dbName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME);
+ db = this.mongoClient.getDB(dbName);
+ collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName());
+ }
+
+ @Override
+ public void setConf(final Configuration conf) {
+ this.conf = conf;
+ if (!isInit) {
+ try {
+ init();
+ isInit = true;
+ } catch (final NumberFormatException | IOException e) {
+ LOG.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
@Override
public void close() throws IOException {
+ mongoClient.close();
}
@Override
public void flush() throws IOException {
}
-
@Override
public Configuration getConf() {
- return null;
+ return conf;
}
-
+
@Override
public String getTableName() {
- return null;
+ return dbName;
}
@Override
- public void storeStatements(Collection<RyaStatement> ryaStatements)
+ public Set<URI> getIndexablePredicates() {
+ return predicates;
+ }
+
+ @Override
+ public void deleteStatement(final RyaStatement stmt) throws IOException {
+ final DBObject obj = storageStrategy.getQuery(stmt);
+ collection.remove(obj);
+ }
+
+ @Override
+ public void storeStatements(final Collection<RyaStatement> ryaStatements)
throws IOException {
- for (RyaStatement ryaStatement : ryaStatements){
+ for (final RyaStatement ryaStatement : ryaStatements){
storeStatement(ryaStatement);
}
-
}
@Override
- public void dropGraph(RyaURI... graphs) {
+ public void storeStatement(final RyaStatement ryaStatement) throws IOException {
+ try {
+ final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+ final boolean isValidPredicate = predicates.isEmpty() || predicates.contains(statement.getPredicate());
+ if (isValidPredicate && (statement.getObject() instanceof Literal)) {
+ final DBObject obj = storageStrategy.serialize(ryaStatement);
+ if (obj != null) {
+ final DBObject query = storageStrategy.serialize(ryaStatement);
+ collection.update(query, obj, true, false);
+ }
+ }
+ } catch (final IllegalArgumentException e) {
+ LOG.error("Unable to parse the statement: " + ryaStatement.toString());
+ }
+ }
+
+ @Override
+ public void dropGraph(final RyaURI... graphs) {
throw new UnsupportedOperationException();
}
+ protected CloseableIteration<Statement, QueryEvaluationException> withConstraints(final StatementConstraints constraints, final DBObject preConstraints) {
+ final DBObject dbo = QueryBuilder.start().and(preConstraints).and(storageStrategy.getQuery(constraints)).get();
+ return closableIterationFromCursor(dbo);
+ }
+
+ private CloseableIteration<Statement, QueryEvaluationException> closableIterationFromCursor(final DBObject dbo) {
+ final DBCursor cursor = collection.find(dbo);
+ return new CloseableIteration<Statement, QueryEvaluationException>() {
+ @Override
+ public boolean hasNext() {
+ return cursor.hasNext();
+ }
+
+ @Override
+ public Statement next() throws QueryEvaluationException {
+ final DBObject dbo = cursor.next();
+ return RyaToRdfConversions.convertStatement(storageStrategy.deserializeDBObject(dbo));
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove not implemented");
+ }
+
+ @Override
+ public void close() throws QueryEvaluationException {
+ cursor.close();
+ }
+ };
+ }
+
+ /**
+ * @return The name of the {@link DBCollection} to use with the storage strategy.
+ */
+ public abstract String getCollectionName();
}