You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2016/10/15 20:06:51 UTC
[31/69] [abbrv] [partial] incubator-rya git commit: RYA-198 Renaming
Files
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java
deleted file mode 100644
index 282ecbb..0000000
--- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package mvm.rya.mongodb.instance;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map.Entry;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.mongodb.BasicDBList;
-import com.mongodb.BasicDBObject;
-import com.mongodb.BasicDBObjectBuilder;
-import com.mongodb.DBObject;
-
-import mvm.rya.api.instance.RyaDetails;
-import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
-import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails;
-import mvm.rya.api.instance.RyaDetails.GeoIndexDetails;
-import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails;
-import mvm.rya.api.instance.RyaDetails.PCJIndexDetails;
-import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
-import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
-import mvm.rya.api.instance.RyaDetails.ProspectorDetails;
-import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails;
-
-/**
- * Serializes configuration details for use in Mongo.
- * The {@link DBObject} will look like:
- * <pre>
- * {@code
- * {
- * "instanceName": <string>,
- * "version": <string>?,
- * "entityCentricDetails": <boolean>,
- * "geoDetails": <boolean>,
- * "pcjDetails": {
- * "enabled": <boolean>,
- * "fluoName": <string>,
- * "pcjs": [{
- * "id": <string>,
- * "updateStrategy": <string>,
- * "lastUpdate": <date>
- * },...,{}
- * ]
- * },
- * "temporalDetails": <boolean>,
- * "freeTextDetails": <boolean>,
- * "prospectorDetails": <date>,
- * "joinSelectivityDetails": <date>
- * }
- * </pre>
- */
-@ParametersAreNonnullByDefault
-public class MongoDetailsAdapter {
- public static final String INSTANCE_KEY = "instanceName";
- public static final String VERSION_KEY = "version";
-
- public static final String ENTITY_DETAILS_KEY = "entityCentricDetails";
- public static final String GEO_DETAILS_KEY = "geoDetails";
- public static final String PCJ_DETAILS_KEY = "pcjDetails";
- public static final String PCJ_ENABLED_KEY = "enabled";
- public static final String PCJ_FLUO_KEY = "fluoName";
- public static final String PCJ_PCJS_KEY = "pcjs";
- public static final String PCJ_ID_KEY = "id";
- public static final String PCJ_UPDATE_STRAT_KEY = "updateStrategy";
- public static final String PCJ_LAST_UPDATE_KEY = "lastUpdate";
- public static final String TEMPORAL_DETAILS_KEY = "temporalDetails";
- public static final String FREETEXT_DETAILS_KEY = "freeTextDetails";
-
- public static final String PROSPECTOR_DETAILS_KEY = "prospectorDetails";
- public static final String JOIN_SELECTIVITY_DETAILS_KEY = "joinSelectivitiyDetails";
-
- /**
- * Serializes {@link RyaDetails} to mongo {@link DBObject}.
- * @param details - The details to be serialized.
- * @return The mongo {@link DBObject}.
- */
- public static BasicDBObject toDBObject(final RyaDetails details) {
- Preconditions.checkNotNull(details);
- final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start()
- .add(INSTANCE_KEY, details.getRyaInstanceName())
- .add(VERSION_KEY, details.getRyaVersion())
- .add(ENTITY_DETAILS_KEY, details.getEntityCentricIndexDetails().isEnabled())
- .add(GEO_DETAILS_KEY, details.getGeoIndexDetails().isEnabled())
- .add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails()))
- .add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled())
- .add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled());
- if(details.getProspectorDetails().getLastUpdated().isPresent()) {
- builder.add(PROSPECTOR_DETAILS_KEY, details.getProspectorDetails().getLastUpdated().get());
- }
- if(details.getJoinSelectivityDetails().getLastUpdated().isPresent()) {
- builder.add(JOIN_SELECTIVITY_DETAILS_KEY, details.getJoinSelectivityDetails().getLastUpdated().get());
- }
- return (BasicDBObject) builder.get();
- }
-
- private static DBObject toDBObject(final PCJIndexDetails pcjIndexDetails) {
- requireNonNull(pcjIndexDetails);
-
- final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start();
-
- // Is Enabled
- builder.add(PCJ_ENABLED_KEY, pcjIndexDetails.isEnabled());
-
- // Fluo Details if present.
- if(pcjIndexDetails.getFluoDetails().isPresent()) {
- builder.add(PCJ_FLUO_KEY, pcjIndexDetails.getFluoDetails().get().getUpdateAppName());
- }
-
- // Add the PCJDetail objects.
- final List<DBObject> pcjDetailsList = new ArrayList<>();
- for(final PCJDetails pcjDetails : pcjIndexDetails.getPCJDetails().values()) {
- pcjDetailsList.add( toDBObject( pcjDetails ) );
- }
- builder.add(PCJ_PCJS_KEY, pcjDetailsList.toArray());
-
- return builder.get();
- }
-
- static DBObject toDBObject(final PCJDetails pcjDetails) {
- requireNonNull(pcjDetails);
-
- final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start();
-
- // PCJ ID
- builder.add(PCJ_ID_KEY, pcjDetails.getId());
-
- // PCJ Update Strategy if present.
- if(pcjDetails.getUpdateStrategy().isPresent()) {
- builder.add(PCJ_UPDATE_STRAT_KEY, pcjDetails.getUpdateStrategy().get().name());
- }
-
- // Last Update Time if present.
- if(pcjDetails.getLastUpdateTime().isPresent()) {
- builder.add(PCJ_LAST_UPDATE_KEY, pcjDetails.getLastUpdateTime().get());
- }
-
- return builder.get();
- }
-
- public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException {
- final BasicDBObject basicObj = (BasicDBObject) mongoObj;
- try {
- return RyaDetails.builder()
- .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
- .setRyaVersion(basicObj.getString(VERSION_KEY))
- .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
- .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
- .setPCJIndexDetails(getPCJIndexDetails(basicObj))
- .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
- .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
- .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
- .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))))
- .build();
- } catch(final Exception e) {
- throw new MalformedRyaDetailsException("Failed to make RyaDetail from Mongo Object, it is malformed.", e);
- }
- }
-
- private static PCJIndexDetails.Builder getPCJIndexDetails(final BasicDBObject basicObj) {
- final BasicDBObject pcjIndexDBO = (BasicDBObject) basicObj.get(PCJ_DETAILS_KEY);
-
- final PCJIndexDetails.Builder pcjBuilder = PCJIndexDetails.builder()
- .setEnabled(pcjIndexDBO.getBoolean(PCJ_ENABLED_KEY))
- .setFluoDetails(new FluoDetails(pcjIndexDBO.getString(PCJ_FLUO_KEY)));
-
- final BasicDBList pcjs = (BasicDBList) pcjIndexDBO.get(PCJ_PCJS_KEY);
- if(pcjs != null) {
- for(int ii = 0; ii < pcjs.size(); ii++) {
- final BasicDBObject pcj = (BasicDBObject) pcjs.get(ii);
- pcjBuilder.addPCJDetails( toPCJDetails(pcj) );
- }
- }
- return pcjBuilder;
- }
-
- static PCJDetails.Builder toPCJDetails(final BasicDBObject dbo) {
- requireNonNull(dbo);
-
- // PCJ ID.
- final PCJDetails.Builder builder = PCJDetails.builder()
- .setId( dbo.getString(PCJ_ID_KEY) );
-
- // PCJ Update Strategy if present.
- if(dbo.containsField(PCJ_UPDATE_STRAT_KEY)) {
- builder.setUpdateStrategy( PCJUpdateStrategy.valueOf( dbo.getString(PCJ_UPDATE_STRAT_KEY) ) );
- }
-
- // Last Update Time if present.
- if(dbo.containsField(PCJ_LAST_UPDATE_KEY)) {
- builder.setLastUpdateTime( dbo.getDate(PCJ_LAST_UPDATE_KEY) );
- }
-
- return builder;
- }
-
- /**
- * Exception thrown when a MongoDB {@link DBObject} is malformed when attemptin
- * to adapt it into a {@link RyaDetails}.
- */
- public static class MalformedRyaDetailsException extends Exception {
- private static final long serialVersionUID = 1L;
-
- /**
- * Creates a new {@link MalformedRyaDetailsException}
- * @param message - The message to be displayed by the exception.
- * @param e - The source cause of the exception.
- */
- public MalformedRyaDetailsException(final String message, final Throwable e) {
- super(message, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoRyaInstanceDetailsRepository.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoRyaInstanceDetailsRepository.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoRyaInstanceDetailsRepository.java
deleted file mode 100644
index dfefa8f..0000000
--- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoRyaInstanceDetailsRepository.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package mvm.rya.mongodb.instance;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.Objects.requireNonNull;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import com.mongodb.BasicDBObject;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBObject;
-import com.mongodb.MongoClient;
-import com.mongodb.WriteResult;
-
-import mvm.rya.api.instance.RyaDetails;
-import mvm.rya.api.instance.RyaDetailsRepository;
-import mvm.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException;
-
-/**
- * An implementation of {@link RyaDetailsRepository} that stores a Rya
- * instance's {@link RyaDetails} in a Mongo document.
- */
-@ParametersAreNonnullByDefault
-public class MongoRyaInstanceDetailsRepository implements RyaDetailsRepository {
- private static final String INSTANCE_DETAILS_COLLECTION_NAME = "instance_details";
-
- private final DB db;
- private final String instanceName;
-
- /**
- * Constructs an instance of {@link MongoRyaInstanceDetailsRepository}.
- *
- * @param client - Connects to the instance of Mongo that hosts the Rya instance. (not null)
- * @param instanceName - The name of the Rya instance this repository represents. (not null)
- */
- public MongoRyaInstanceDetailsRepository(final MongoClient client, final String instanceName) {
- checkNotNull(client);
- this.instanceName = requireNonNull( instanceName );
- db = client.getDB(this.instanceName);
- }
-
- @Override
- public boolean isInitialized() throws RyaDetailsRepositoryException {
- final DBCollection col = db.getCollection(INSTANCE_DETAILS_COLLECTION_NAME);
- return col.count() == 1;
- }
-
- @Override
- public void initialize(final RyaDetails details) throws AlreadyInitializedException, RyaDetailsRepositoryException {
- // Preconditions.
- requireNonNull( details );
-
- if(!details.getRyaInstanceName().equals( instanceName )) {
- throw new RyaDetailsRepositoryException("The instance name that was in the provided 'details' does not match " +
- "the instance name that this repository is connected to. Make sure you're connected to the" +
- "correct Rya instance.");
- }
-
- if(isInitialized()) {
- throw new AlreadyInitializedException("The repository has already been initialized for the Rya instance named '" +
- instanceName + "'.");
- }
-
- // Create the document that hosts the details if it has not been created yet.
- final DBCollection col = db.createCollection(INSTANCE_DETAILS_COLLECTION_NAME, new BasicDBObject());
-
- // Write the details to the collection.
- col.insert(MongoDetailsAdapter.toDBObject(details));
- }
-
- @Override
- public RyaDetails getRyaInstanceDetails() throws NotInitializedException, RyaDetailsRepositoryException {
- // Preconditions.
- if(!isInitialized()) {
- throw new NotInitializedException("Could not fetch the details for the Rya instanced named '" +
- instanceName + "' because it has not been initialized yet.");
- }
-
- // Fetch the value from the collection.
- final DBCollection col = db.getCollection(INSTANCE_DETAILS_COLLECTION_NAME);
- //There should only be one document in the collection.
- final DBObject mongoObj = col.findOne();
-
- try{
- // Deserialize it.
- return MongoDetailsAdapter.toRyaDetails( mongoObj );
- } catch (final MalformedRyaDetailsException e) {
- throw new RyaDetailsRepositoryException("The existing details details are malformed.", e);
- }
- }
-
- @Override
- public void update(final RyaDetails oldDetails, final RyaDetails newDetails)
- throws NotInitializedException, ConcurrentUpdateException, RyaDetailsRepositoryException {
- // Preconditions.
- requireNonNull(oldDetails);
- requireNonNull(newDetails);
-
- if(!newDetails.getRyaInstanceName().equals( instanceName )) {
- throw new RyaDetailsRepositoryException("The instance name that was in the provided 'newDetails' does not match " +
- "the instance name that this repository is connected to. Make sure you're connected to the" +
- "correct Rya instance.");
- }
-
- if(!isInitialized()) {
- throw new NotInitializedException("Could not update the details for the Rya instanced named '" +
- instanceName + "' because it has not been initialized yet.");
- }
-
- if(oldDetails.equals(newDetails)) {
- return;
- }
-
- final DBCollection col = db.getCollection(INSTANCE_DETAILS_COLLECTION_NAME);
- final DBObject oldObj = MongoDetailsAdapter.toDBObject(oldDetails);
- final DBObject newObj = MongoDetailsAdapter.toDBObject(newDetails);
- final WriteResult result = col.update(oldObj, newObj);
-
- //since there is only 1 document, there should only be 1 update.
- if(result.getN() != 1) {
- throw new ConcurrentUpdateException("Could not update the details for the Rya instance named '" +
- instanceName + "' because the old value is out of date.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
deleted file mode 100644
index ba37ca1..0000000
--- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package mvm.rya.mongodb.iter;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.Iterator;
-
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.persist.RyaDAOException;
-
-public class NonCloseableRyaStatementCursorIterator implements Iterator<RyaStatement> {
-
- RyaStatementCursorIterator iterator;
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public RyaStatement next() {
- return iterator.next();
- }
-
- public NonCloseableRyaStatementCursorIterator(
- RyaStatementCursorIterator iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public void remove() {
- try {
- iterator.remove();
- } catch (RyaDAOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
deleted file mode 100644
index d24cbdc..0000000
--- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package mvm.rya.mongodb.iter;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.mongodb.dao.MongoDBStorageStrategy;
-
-import org.openrdf.query.BindingSet;
-
-import com.google.common.collect.Multimap;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
-
-public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> {
-
- private DBCollection coll;
- private Multimap<DBObject, BindingSet> rangeMap;
- private Iterator<DBObject> queryIterator;
- private Long maxResults;
- private DBCursor resultCursor;
- private RyaStatement currentStatement;
- private Collection<BindingSet> currentBindingSetCollection;
- private Iterator<BindingSet> currentBindingSetIterator;
- private MongoDBStorageStrategy strategy;
-
- public RyaStatementBindingSetCursorIterator(DBCollection coll,
- Multimap<DBObject, BindingSet> rangeMap, MongoDBStorageStrategy strategy) {
- this.coll = coll;
- this.rangeMap = rangeMap;
- this.queryIterator = rangeMap.keySet().iterator();
- this.strategy = strategy;
- }
-
- @Override
- public boolean hasNext() {
- if (!currentBindingSetIteratorIsValid()) {
- findNextResult();
- }
- return currentBindingSetIteratorIsValid();
- }
-
- @Override
- public Entry<RyaStatement, BindingSet> next() {
- if (!currentBindingSetIteratorIsValid()) {
- findNextResult();
- }
- if (currentBindingSetIteratorIsValid()) {
- BindingSet currentBindingSet = currentBindingSetIterator.next();
- return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(currentStatement, currentBindingSet);
- }
- return null;
- }
-
- private boolean currentBindingSetIteratorIsValid() {
- return (currentBindingSetIterator != null) && currentBindingSetIterator.hasNext();
- }
-
- private void findNextResult() {
- if (!currentResultCursorIsValid()) {
- findNextValidResultCursor();
- }
- if (currentResultCursorIsValid()) {
- // convert to Rya Statement
- DBObject queryResult = resultCursor.next();
- currentStatement = strategy.deserializeDBObject(queryResult);
- currentBindingSetIterator = currentBindingSetCollection.iterator();
- }
- }
-
- private void findNextValidResultCursor() {
- while (queryIterator.hasNext()){
- DBObject currentQuery = queryIterator.next();
- resultCursor = coll.find(currentQuery);
- currentBindingSetCollection = rangeMap.get(currentQuery);
- if (resultCursor.hasNext()) return;
- }
- }
-
- private boolean currentResultCursorIsValid() {
- return (resultCursor != null) && resultCursor.hasNext();
- }
-
-
- public void setMaxResults(Long maxResults) {
- this.maxResults = maxResults;
- }
-
- @Override
- public void close() throws RyaDAOException {
- // TODO don't know what to do here
- }
-
- @Override
- public void remove() throws RyaDAOException {
- next();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java
deleted file mode 100644
index 83bd2d4..0000000
--- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterable.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package mvm.rya.mongodb.iter;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.persist.RyaDAOException;
-
-import org.calrissian.mango.collect.CloseableIterable;
-import org.calrissian.mango.collect.CloseableIterator;
-import org.openrdf.query.BindingSet;
-
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
-
-public class RyaStatementCursorIterable implements CloseableIterable<RyaStatement> {
-
-
- private NonCloseableRyaStatementCursorIterator iterator;
-
- public RyaStatementCursorIterable(NonCloseableRyaStatementCursorIterator iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public Iterator<RyaStatement> iterator() {
- // TODO Auto-generated method stub
- return iterator;
- }
-
- @Override
- public void closeQuietly() {
- //TODO don't know what to do here
- }
-
- @Override
- public void close() throws IOException {
- // TODO Auto-generated method stub
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java
deleted file mode 100644
index 8df2c60..0000000
--- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementCursorIterator.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package mvm.rya.mongodb.iter;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.mongodb.dao.MongoDBStorageStrategy;
-
-import org.calrissian.mango.collect.CloseableIterable;
-import org.openrdf.query.BindingSet;
-
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
-
-public class RyaStatementCursorIterator implements CloseableIteration<RyaStatement, RyaDAOException> {
-
- private DBCollection coll;
- private Iterator<DBObject> queryIterator;
- private DBCursor currentCursor;
- private MongoDBStorageStrategy strategy;
- private Long maxResults;
-
- public RyaStatementCursorIterator(DBCollection coll, Set<DBObject> queries, MongoDBStorageStrategy strategy) {
- this.coll = coll;
- this.queryIterator = queries.iterator();
- this.strategy = strategy;
- }
-
- @Override
- public boolean hasNext() {
- if (!currentCursorIsValid()) {
- findNextValidCursor();
- }
- return currentCursorIsValid();
- }
-
- @Override
- public RyaStatement next() {
- if (!currentCursorIsValid()) {
- findNextValidCursor();
- }
- if (currentCursorIsValid()) {
- // convert to Rya Statement
- DBObject queryResult = currentCursor.next();
- RyaStatement statement = strategy.deserializeDBObject(queryResult);
- return statement;
- }
- return null;
- }
-
- private void findNextValidCursor() {
- while (queryIterator.hasNext()){
- DBObject currentQuery = queryIterator.next();
- currentCursor = coll.find(currentQuery);
- if (currentCursor.hasNext()) break;
- }
- }
-
- private boolean currentCursorIsValid() {
- return (currentCursor != null) && currentCursor.hasNext();
- }
-
-
- public void setMaxResults(Long maxResults) {
- this.maxResults = maxResults;
- }
-
- @Override
- public void close() throws RyaDAOException {
- // TODO don't know what to do here
- }
-
- @Override
- public void remove() throws RyaDAOException {
- next();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoConnectorFactory.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoConnectorFactory.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoConnectorFactory.java
new file mode 100644
index 0000000..77a9f16
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoConnectorFactory.java
@@ -0,0 +1,138 @@
+package mvm.rya.mongodb;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.io.IOException;
+
+import org.apache.commons.configuration.ConfigurationRuntimeException;
+import org.apache.hadoop.conf.Configuration;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.MongoException;
+import com.mongodb.ServerAddress;
+
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory;
+
+/**
+ * Mongo convention generally allows for a single instance of a {@link MongoClient}
+ * throughout the life cycle of an application. This MongoConnectorFactory lazy
+ * loads a Mongo Client and uses the same one whenever {@link MongoConnectorFactory#getMongoClient(Configuration)}
+ * is invoked.
+ */
+public class MongoConnectorFactory {
+ private static MongoClient mongoClient;
+
+ private final static String MSG_INTRO = "Failed to connect to MongoDB: ";
+
+ /**
+ * @param conf The {@link Configuration} defining how to construct the MongoClient.
+ * @return A {@link MongoClient}. This client is lazy loaded and the same one
+ * is used throughout the lifecycle of the application.
+ * @throws IOException - if MongodForTestsFactory constructor has an io exception.
+ * @throws ConfigurationRuntimeException - Thrown if the configured server, port, user, or others are missing.
+ * @throws MongoException if can't connect despite conf parameters are given
+ */
+ public static synchronized MongoClient getMongoClient(final Configuration conf)
+ throws ConfigurationRuntimeException, MongoException {
+ if (mongoClient == null) {
+ // The static client has not yet created, is it a test/mock instance, or a service?
+ if (conf.getBoolean(MongoDBRdfConfiguration.USE_TEST_MONGO, false)) {
+ createMongoClientForTests();
+ } else {
+ createMongoClientForServer(conf);
+ }
+ }
+ return mongoClient;
+ }
+
+ /**
+ * Create a local temporary MongoDB instance and client object and assign it to this class's static mongoClient
+ * @throws MongoException if can't connect
+ */
+ private static void createMongoClientForTests() throws MongoException {
+ try {
+ MongodForTestsFactory testsFactory = MongodForTestsFactory.with(Version.Main.PRODUCTION);
+ mongoClient = testsFactory.newMongo();
+ } catch (IOException e) {
+ // Rethrow as an unchecked error. Since we are in a test mode here, just fail fast.
+ throw new MongoException(MSG_INTRO+"creating a factory for a test/mock MongoDB instance.",e);
+ }
+ }
+
+ /**
+ * Create a MongoDB client object and assign it to this class's static mongoClient
+ * @param conf configuration containing connection parameters
+ * @throws ConfigurationRuntimeException - Thrown if the configured server, port, user, or others are missing.
+ * @throws MongoException if can't connect despite conf parameters are given
+ */
+ private static void createMongoClientForServer(final Configuration conf)
+ throws ConfigurationRuntimeException, MongoException {
+ // Connect to a running Mongo server
+ final String host = requireNonNull(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE), MSG_INTRO+"host name is required");
+ final int port = requireNonNullInt(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT), MSG_INTRO+"Port number is required.");
+ ServerAddress server = new ServerAddress(host, port);
+ // check for authentication credentials
+ if (conf.get(MongoDBRdfConfiguration.MONGO_USER) != null) {
+ final String username = conf.get(MongoDBRdfConfiguration.MONGO_USER);
+ final String dbName = requireNonNull(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME),
+ MSG_INTRO + MongoDBRdfConfiguration.MONGO_DB_NAME + " is null but required configuration if "
+ + MongoDBRdfConfiguration.MONGO_USER + " is configured.");
+ final char[] pswd = requireNonNull(conf.get(MongoDBRdfConfiguration.MONGO_USER_PASSWORD),
+ MSG_INTRO + MongoDBRdfConfiguration.MONGO_USER_PASSWORD + " is null but required configuration if "
+ + MongoDBRdfConfiguration.MONGO_USER + " is configured.").toCharArray();
+ final MongoCredential cred = MongoCredential.createCredential(username, dbName, pswd);
+ mongoClient = new MongoClient(server, Arrays.asList(cred));
+ } else {
+ // No user was configured:
+ mongoClient = new MongoClient(server);
+ }
+ }
+
+ /**
+ * Throw exception for un-configured required values.
+ *
+ * @param required String to check
+ * @param message throw configuration exception with this description
+ * @return unaltered required string
+ * @throws ConfigurationRuntimeException if required is null
+ */
+ private static String requireNonNull(String required, String message) throws ConfigurationRuntimeException {
+ if (required == null)
+ throw new ConfigurationRuntimeException(message);
+ return required;
+ }
+
+ /*
+ * Same as above, check that it is a integer and return the parsed integer.
+ */
+ private static int requireNonNullInt(String required, String message) throws ConfigurationRuntimeException {
+ if (required == null)
+ throw new ConfigurationRuntimeException(message);
+ try {
+ return Integer.parseInt(required);
+ } catch (NumberFormatException e) {
+ throw new ConfigurationRuntimeException(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
new file mode 100644
index 0000000..afa0a77
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
@@ -0,0 +1,202 @@
+package mvm.rya.mongodb;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.calrissian.mango.collect.CloseableIterable;
+import org.openrdf.query.BindingSet;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.query.BatchRyaQuery;
+import mvm.rya.api.persist.query.RyaQuery;
+import mvm.rya.api.persist.query.RyaQueryEngine;
+import mvm.rya.mongodb.dao.MongoDBStorageStrategy;
+import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import mvm.rya.mongodb.iter.NonCloseableRyaStatementCursorIterator;
+import mvm.rya.mongodb.iter.RyaStatementBindingSetCursorIterator;
+import mvm.rya.mongodb.iter.RyaStatementCursorIterable;
+import mvm.rya.mongodb.iter.RyaStatementCursorIterator;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Date: 7/17/12
+ * Time: 9:28 AM
+ */
+public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguration>, Closeable {
+
+ private MongoDBRdfConfiguration configuration;
+ private final MongoClient mongoClient;
+ private final DBCollection coll;
+ private final MongoDBStorageStrategy strategy;
+
+ public MongoDBQueryEngine(final MongoDBRdfConfiguration conf, final MongoClient mongoClient) {
+ this.mongoClient = checkNotNull(mongoClient);
+ final DB db = mongoClient.getDB( conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME));
+ coll = db.getCollection(conf.getTriplesCollectionName());
+ strategy = new SimpleMongoDBStorageStrategy();
+ }
+
+
+ @Override
+ public void setConf(final MongoDBRdfConfiguration conf) {
+ configuration = conf;
+ }
+
+ @Override
+ public MongoDBRdfConfiguration getConf() {
+ return configuration;
+ }
+
+ @Override
+ public CloseableIteration<RyaStatement, RyaDAOException> query(
+ final RyaStatement stmt, MongoDBRdfConfiguration conf)
+ throws RyaDAOException {
+ if (conf == null) {
+ conf = configuration;
+ }
+ final Long maxResults = conf.getLimit();
+ final Set<DBObject> queries = new HashSet<DBObject>();
+ final DBObject query = strategy.getQuery(stmt);
+ queries.add(query);
+ final RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(coll, queries, strategy);
+
+ if (maxResults != null) {
+ iterator.setMaxResults(maxResults);
+ }
+ return iterator;
+ }
+ @Override
+ public CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet(
+ final Collection<Entry<RyaStatement, BindingSet>> stmts,
+ MongoDBRdfConfiguration conf) throws RyaDAOException {
+ if (conf == null) {
+ conf = configuration;
+ }
+ final Long maxResults = conf.getLimit();
+ final Multimap<DBObject, BindingSet> rangeMap = HashMultimap.create();
+
+ //TODO: cannot span multiple tables here
+ try {
+ for (final Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) {
+ final RyaStatement stmt = stmtbs.getKey();
+ final BindingSet bs = stmtbs.getValue();
+ final DBObject query = strategy.getQuery(stmt);
+ rangeMap.put(query, bs);
+ }
+
+ // TODO not sure what to do about regex ranges?
+ final RyaStatementBindingSetCursorIterator iterator = new RyaStatementBindingSetCursorIterator(coll, rangeMap, strategy);
+
+ if (maxResults != null) {
+ iterator.setMaxResults(maxResults);
+ }
+ return iterator;
+ } catch (final Exception e) {
+ throw new RyaDAOException(e);
+ }
+
+ }
+ @Override
+ public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(
+ final Collection<RyaStatement> stmts, MongoDBRdfConfiguration conf)
+ throws RyaDAOException {
+ if (conf == null) {
+ conf = configuration;
+ }
+ final Long maxResults = conf.getLimit();
+ final Set<DBObject> queries = new HashSet<DBObject>();
+
+ try {
+ for (final RyaStatement stmt : stmts) {
+ queries.add( strategy.getQuery(stmt));
+ }
+
+ // TODO not sure what to do about regex ranges?
+ final RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(coll, queries, strategy);
+
+ if (maxResults != null) {
+ iterator.setMaxResults(maxResults);
+ }
+ return iterator;
+ } catch (final Exception e) {
+ throw new RyaDAOException(e);
+ }
+
+ }
+ @Override
+ public CloseableIterable<RyaStatement> query(final RyaQuery ryaQuery)
+ throws RyaDAOException {
+ final Set<DBObject> queries = new HashSet<DBObject>();
+
+ try {
+ queries.add( strategy.getQuery(ryaQuery));
+
+ // TODO not sure what to do about regex ranges?
+ // TODO this is gross
+ final RyaStatementCursorIterable iterator = new RyaStatementCursorIterable(new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(coll, queries, strategy)));
+
+ return iterator;
+ } catch (final Exception e) {
+ throw new RyaDAOException(e);
+ }
+ }
+ @Override
+ public CloseableIterable<RyaStatement> query(final BatchRyaQuery batchRyaQuery)
+ throws RyaDAOException {
+ try {
+ final Set<DBObject> queries = new HashSet<DBObject>();
+ for (final RyaStatement statement : batchRyaQuery.getQueries()){
+ queries.add( strategy.getQuery(statement));
+
+ }
+
+ // TODO not sure what to do about regex ranges?
+ // TODO this is gross
+ final RyaStatementCursorIterable iterator = new RyaStatementCursorIterable(new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(coll, queries, strategy)));
+
+ return iterator;
+ } catch (final Exception e) {
+ throw new RyaDAOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (mongoClient != null){ mongoClient.close(); }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
new file mode 100644
index 0000000..e8e301d
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
@@ -0,0 +1,128 @@
+package mvm.rya.mongodb;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import java.util.List;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+import com.mongodb.MongoClient;
+
+public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
+ public static final String MONGO_INSTANCE = "mongo.db.instance";
+ public static final String MONGO_INSTANCE_PORT = "mongo.db.port";
+ public static final String MONGO_GEO_MAXDISTANCE = "mongo.geo.maxdist";
+ public static final String MONGO_DB_NAME = "mongo.db.name";
+ public static final String MONGO_COLLECTION_PREFIX = "mongo.db.collectionprefix";
+ public static final String MONGO_USER = "mongo.db.user";
+ public static final String MONGO_USER_PASSWORD = "mongo.db.userpassword";
+ public static final String USE_TEST_MONGO = "mongo.db.test";
+ public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers";
+ private MongoClient mongoClient;
+
+ public MongoDBRdfConfiguration() {
+ super();
+ }
+
+ public MongoDBRdfConfiguration(Configuration other) {
+ super(other);
+ }
+
+ @Override
+ public MongoDBRdfConfiguration clone() {
+ return new MongoDBRdfConfiguration(this);
+ }
+
+ public boolean getUseTestMongo() {
+ return this.getBoolean(USE_TEST_MONGO, false);
+ }
+
+ public void setUseTestMongo(boolean useTestMongo) {
+ this.setBoolean(USE_TEST_MONGO, useTestMongo);
+ }
+
+ public String getTriplesCollectionName() {
+ return this.get(MONGO_COLLECTION_PREFIX, "rya") + "_triples";
+ }
+
+ public String getCollectionName() {
+ return this.get(MONGO_COLLECTION_PREFIX, "rya");
+ }
+
+ public void setCollectionName(String name) {
+ this.set(MONGO_COLLECTION_PREFIX, name);
+ }
+
+ public String getMongoInstance() {
+ return this.get(MONGO_INSTANCE, "localhost");
+ }
+
+ public void setMongoInstance(String name) {
+ this.set(MONGO_INSTANCE, name);
+ }
+
+ public String getMongoPort() {
+ return this.get(MONGO_INSTANCE_PORT, "27017");
+ }
+
+ public void setMongoPort(String name) {
+ this.set(MONGO_INSTANCE_PORT, name);
+ }
+
+ public String getMongoDBName() {
+ return this.get(MONGO_DB_NAME, "rya");
+ }
+
+ public void setMongoDBName(String name) {
+ this.set(MONGO_DB_NAME, name);
+ }
+
+ public String getNameSpacesCollectionName() {
+ return this.get(MONGO_COLLECTION_PREFIX, "rya") + "_ns";
+ }
+
+ public void setAdditionalIndexers(Class<? extends MongoSecondaryIndex>... indexers) {
+ List<String> strs = Lists.newArrayList();
+ for (Class<?> ai : indexers){
+ strs.add(ai.getName());
+ }
+
+ setStrings(CONF_ADDITIONAL_INDEXERS, strs.toArray(new String[]{}));
+ }
+
+ public List<MongoSecondaryIndex> getAdditionalIndexers() {
+ return getInstances(CONF_ADDITIONAL_INDEXERS, MongoSecondaryIndex.class);
+ }
+
+ public void setMongoClient(MongoClient client){
+ this.mongoClient = client;
+ }
+
+ public MongoClient getMongoClient() {
+ return mongoClient;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
new file mode 100644
index 0000000..bb5d58e
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
@@ -0,0 +1,233 @@
+package mvm.rya.mongodb;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.DuplicateKeyException;
+import com.mongodb.InsertOptions;
+import com.mongodb.MongoClient;
+
+import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+import mvm.rya.api.persist.query.RyaQueryEngine;
+import mvm.rya.mongodb.dao.MongoDBNamespaceManager;
+import mvm.rya.mongodb.dao.MongoDBStorageStrategy;
+import mvm.rya.mongodb.dao.SimpleMongoDBNamespaceManager;
+import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+
+/**
+ * Default DAO for mongo backed RYA allowing for CRUD operations.
+ */
+public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
+ private static final Logger log = Logger.getLogger(MongoDBRyaDAO.class);
+
+ private MongoDBRdfConfiguration conf;
+ private MongoClient mongoClient;
+ private DB db;
+ private DBCollection coll;
+ private MongoDBQueryEngine queryEngine;
+ private MongoDBStorageStrategy storageStrategy;
+ private MongoDBNamespaceManager nameSpaceManager;
+ private MongodForTestsFactory testsFactory;
+
+ private List<MongoSecondaryIndex> secondaryIndexers;
+
+ /**
+ * Creates a new {@link MongoDBRyaDAO}
+ * @param conf
+ * @throws RyaDAOException
+ */
+ public MongoDBRyaDAO(final MongoDBRdfConfiguration conf) throws RyaDAOException, NumberFormatException, UnknownHostException {
+ this.conf = conf;
+ mongoClient = MongoConnectorFactory.getMongoClient(conf);
+ conf.setMongoClient(mongoClient);
+ init();
+ }
+
+
+ public MongoDBRyaDAO(final MongoDBRdfConfiguration conf, final MongoClient mongoClient) throws RyaDAOException{
+ this.conf = conf;
+ this.mongoClient = mongoClient;
+ conf.setMongoClient(mongoClient);
+ init();
+ }
+
+ @Override
+ public void setConf(final MongoDBRdfConfiguration conf) {
+ this.conf = conf;
+ }
+
+ public MongoClient getMongoClient(){
+ return mongoClient;
+ }
+
+ public void setDB(final DB db) {
+ this.db = db;
+ }
+
+
+ public void setDBCollection(final DBCollection coll) {
+ this.coll = coll;
+ }
+
+ @Override
+ public MongoDBRdfConfiguration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void init() throws RyaDAOException {
+ secondaryIndexers = conf.getAdditionalIndexers();
+ for(final MongoSecondaryIndex index: secondaryIndexers) {
+ index.setConf(conf);
+ index.setClient(mongoClient);
+ }
+
+ db = mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME));
+ coll = db.getCollection(conf.getTriplesCollectionName());
+ nameSpaceManager = new SimpleMongoDBNamespaceManager(db.getCollection(conf.getNameSpacesCollectionName()));
+ queryEngine = new MongoDBQueryEngine(conf, mongoClient);
+ storageStrategy = new SimpleMongoDBStorageStrategy();
+ storageStrategy.createIndices(coll);
+ for(final MongoSecondaryIndex index: secondaryIndexers) {
+ index.init();
+ }
+ }
+
+ @Override
+ public boolean isInitialized() throws RyaDAOException {
+ return true;
+ }
+
+ @Override
+ public void destroy() throws RyaDAOException {
+ if (mongoClient != null) {
+ mongoClient.close();
+ }
+ if (conf.getUseTestMongo()) {
+ testsFactory.shutdown();
+ }
+
+ IOUtils.closeQuietly(queryEngine);
+ }
+
+ @Override
+ public void add(final RyaStatement statement) throws RyaDAOException {
+ // add it to the collection
+ try {
+ coll.insert(storageStrategy.serialize(statement));
+ for(final RyaSecondaryIndexer index: secondaryIndexers) {
+ index.storeStatement(statement);
+ }
+ } catch (IOException e) {
+ log.error("Unable to add: " + statement.toString());
+ throw new RyaDAOException(e);
+ }
+ catch (DuplicateKeyException e){
+ log.error("Attempting to load duplicate triple: " + statement.toString());
+ }
+ }
+
+ @Override
+ public void add(final Iterator<RyaStatement> statement) throws RyaDAOException {
+ final List<DBObject> dbInserts = new ArrayList<DBObject>();
+ while (statement.hasNext()){
+ final RyaStatement ryaStatement = statement.next();
+ final DBObject insert = storageStrategy.serialize(ryaStatement);
+ dbInserts.add(insert);
+
+ try {
+ for (final RyaSecondaryIndexer index : secondaryIndexers) {
+ index.storeStatement(ryaStatement);
+ }
+ } catch (final IOException e) {
+ log.error("Failed to add: " + ryaStatement.toString() + " to the indexer");
+ }
+
+ }
+ coll.insert(dbInserts, new InsertOptions().continueOnError(true));
+ }
+
+ @Override
+ public void delete(final RyaStatement statement, final MongoDBRdfConfiguration conf)
+ throws RyaDAOException {
+ final DBObject obj = storageStrategy.getQuery(statement);
+ coll.remove(obj);
+ }
+
+ @Override
+ public void dropGraph(final MongoDBRdfConfiguration conf, final RyaURI... graphs)
+ throws RyaDAOException {
+
+ }
+
+ @Override
+ public void delete(final Iterator<RyaStatement> statements,
+ final MongoDBRdfConfiguration conf) throws RyaDAOException {
+ while (statements.hasNext()){
+ final RyaStatement ryaStatement = statements.next();
+ coll.remove(storageStrategy.getQuery(ryaStatement));
+ }
+
+ }
+
+ @Override
+ public String getVersion() throws RyaDAOException {
+ return "1.0";
+ }
+
+ @Override
+ public RyaQueryEngine<MongoDBRdfConfiguration> getQueryEngine() {
+ return queryEngine;
+ }
+
+ @Override
+ public RyaNamespaceManager<MongoDBRdfConfiguration> getNamespaceManager() {
+ return nameSpaceManager;
+ }
+
+ @Override
+ public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+
+ }
+
+ @Override
+ public void dropAndDestroy() throws RyaDAOException {
+ db.dropDatabase(); // this is dangerous!
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoSecondaryIndex.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoSecondaryIndex.java
new file mode 100644
index 0000000..e32216f
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoSecondaryIndex.java
@@ -0,0 +1,31 @@
+package mvm.rya.mongodb;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.mongodb.MongoClient;
+
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+
+public interface MongoSecondaryIndex extends RyaSecondaryIndexer{
+ public void init();
+
+ public void setClient(MongoClient client);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBNamespaceManager.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBNamespaceManager.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBNamespaceManager.java
new file mode 100644
index 0000000..fd9b659
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBNamespaceManager.java
@@ -0,0 +1,35 @@
+package mvm.rya.mongodb.dao;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.persist.query.RyaQuery;
+import mvm.rya.mongodb.MongoDBRdfConfiguration;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+public interface MongoDBNamespaceManager extends RyaNamespaceManager<MongoDBRdfConfiguration>{
+
+ public void createIndices(DBCollection coll);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBStorageStrategy.java
new file mode 100644
index 0000000..5ae371b
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/MongoDBStorageStrategy.java
@@ -0,0 +1,45 @@
+package mvm.rya.mongodb.dao;
+
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.query.RyaQuery;
+
+/**
+ * Defines how objects are stored in MongoDB.
+ * <T> - The object to store in MongoDB
+ */
+public interface MongoDBStorageStrategy<T> {
+
+ public DBObject getQuery(T statement);
+
+ public RyaStatement deserializeDBObject(DBObject queryResult);
+
+ public DBObject serialize(T statement);
+
+ public DBObject getQuery(RyaQuery ryaQuery);
+
+ public void createIndices(DBCollection coll);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java
new file mode 100644
index 0000000..259420b
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBNamespaceManager.java
@@ -0,0 +1,181 @@
+package mvm.rya.mongodb.dao;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.mongodb.MongoDBRdfConfiguration;
+
+import org.apache.commons.codec.binary.Hex;
+import org.openrdf.model.Namespace;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+
+public class SimpleMongoDBNamespaceManager implements MongoDBNamespaceManager {
+
+ public class NamespaceImplementation implements Namespace {
+
+ private String namespace;
+ private String prefix;
+
+ public NamespaceImplementation(String namespace, String prefix) {
+ this.namespace = namespace;
+ this.prefix = prefix;
+ }
+
+ @Override
+ public int compareTo(Namespace o) {
+ if (!namespace.equalsIgnoreCase(o.getName())) return namespace.compareTo(o.getName());
+ if (!prefix.equalsIgnoreCase(o.getPrefix())) return prefix.compareTo(o.getPrefix());
+ return 0;
+ }
+
+ @Override
+ public String getName() {
+ return namespace;
+ }
+
+ @Override
+ public String getPrefix() {
+ return prefix;
+ }
+
+ }
+
+ public class MongoCursorIteration implements
+ CloseableIteration<Namespace, RyaDAOException> {
+ private DBCursor cursor;
+
+ public MongoCursorIteration(DBCursor cursor2) {
+ this.cursor = cursor2;
+ }
+
+ @Override
+ public boolean hasNext() throws RyaDAOException {
+ return cursor.hasNext();
+ }
+
+ @Override
+ public Namespace next() throws RyaDAOException {
+ DBObject ns = cursor.next();
+ Map values = ns.toMap();
+ String namespace = (String) values.get(NAMESPACE);
+ String prefix = (String) values.get(PREFIX);
+
+ Namespace temp = new NamespaceImplementation(namespace, prefix);
+ return temp;
+ }
+
+ @Override
+ public void remove() throws RyaDAOException {
+ next();
+ }
+
+ @Override
+ public void close() throws RyaDAOException {
+ cursor.close();
+ }
+
+ }
+
+ private static final String ID = "_id";
+ private static final String PREFIX = "prefix";
+ private static final String NAMESPACE = "namespace";
+ private MongoDBRdfConfiguration conf;
+ private DBCollection nsColl;
+
+
+ public SimpleMongoDBNamespaceManager(DBCollection nameSpaceCollection) {
+ nsColl = nameSpaceCollection;
+ }
+
+ @Override
+ public void createIndices(DBCollection coll){
+ coll.createIndex(PREFIX);
+ coll.createIndex(NAMESPACE);
+ }
+
+
+ @Override
+ public void setConf(MongoDBRdfConfiguration paramC) {
+ this.conf = paramC;
+ }
+
+ @Override
+ public MongoDBRdfConfiguration getConf() {
+ // TODO Auto-generated method stub
+ return conf;
+ }
+
+ @Override
+ public void addNamespace(String prefix, String namespace)
+ throws RyaDAOException {
+ String id = prefix;
+ byte[] bytes = id.getBytes();
+ try {
+ MessageDigest digest = MessageDigest.getInstance("SHA-1");
+ bytes = digest.digest(bytes);
+ } catch (NoSuchAlgorithmException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes)))
+ .append(PREFIX, prefix)
+ .append(NAMESPACE, namespace);
+ nsColl.insert(doc);
+
+ }
+
+ @Override
+ public String getNamespace(String prefix) throws RyaDAOException {
+ DBObject query = new BasicDBObject().append(PREFIX, prefix);
+ DBCursor cursor = nsColl.find(query);
+ String nameSpace = prefix;
+ while (cursor.hasNext()){
+ DBObject obj = cursor.next();
+ nameSpace = (String) obj.toMap().get(NAMESPACE);
+ }
+ return nameSpace;
+ }
+
+ @Override
+ public void removeNamespace(String prefix) throws RyaDAOException {
+ DBObject query = new BasicDBObject().append(PREFIX, prefix);
+ nsColl.remove(query);
+ }
+
+ @Override
+ public CloseableIteration<? extends Namespace, RyaDAOException> iterateNamespace()
+ throws RyaDAOException {
+ DBObject query = new BasicDBObject();
+ DBCursor cursor = nsColl.find(query);
+ return new MongoCursorIteration(cursor);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
new file mode 100644
index 0000000..d09316a
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java
@@ -0,0 +1,162 @@
+package mvm.rya.mongodb.dao;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.openrdf.model.vocabulary.XMLSchema.ANYURI;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.log4j.Logger;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.query.RyaQuery;
+
+/**
+ * Defines how {@link RyaStatement}s are stored in MongoDB.
+ */
+public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaStatement> {
+ private static final Logger LOG = Logger.getLogger(SimpleMongoDBStorageStrategy.class);
+ protected static final String ID = "_id";
+ protected static final String OBJECT_TYPE = "objectType";
+ protected static final String OBJECT_TYPE_VALUE = XMLSchema.ANYURI.stringValue();
+ protected static final String CONTEXT = "context";
+ protected static final String PREDICATE = "predicate";
+ protected static final String OBJECT = "object";
+ protected static final String SUBJECT = "subject";
+ public static final String TIMESTAMP = "insertTimestamp";
+ protected ValueFactoryImpl factory = new ValueFactoryImpl();
+
+ @Override
+ public void createIndices(final DBCollection coll){
+ BasicDBObject doc = new BasicDBObject();
+ doc.put(SUBJECT, 1);
+ doc.put(PREDICATE, 1);
+ coll.createIndex(doc);
+ doc = new BasicDBObject(PREDICATE, 1);
+ doc.put(OBJECT, 1);
+ doc.put(OBJECT_TYPE, 1);
+ coll.createIndex(doc);
+ doc = new BasicDBObject(OBJECT, 1);
+ doc = new BasicDBObject(OBJECT_TYPE, 1);
+ doc.put(SUBJECT, 1);
+ coll.createIndex(doc);
+ }
+
+ @Override
+ public DBObject getQuery(final RyaStatement stmt) {
+ final RyaURI subject = stmt.getSubject();
+ final RyaURI predicate = stmt.getPredicate();
+ final RyaType object = stmt.getObject();
+ final RyaURI context = stmt.getContext();
+ final BasicDBObject query = new BasicDBObject();
+ if (subject != null){
+ query.append(SUBJECT, subject.getData());
+ }
+ if (object != null){
+ query.append(OBJECT, object.getData());
+ query.append(OBJECT_TYPE, object.getDataType().toString());
+ }
+ if (predicate != null){
+ query.append(PREDICATE, predicate.getData());
+ }
+ if (context != null){
+ query.append(CONTEXT, context.getData());
+ }
+
+ return query;
+ }
+
+ @Override
+ public RyaStatement deserializeDBObject(final DBObject queryResult) {
+ final Map result = queryResult.toMap();
+ final String subject = (String) result.get(SUBJECT);
+ final String object = (String) result.get(OBJECT);
+ final String objectType = (String) result.get(OBJECT_TYPE);
+ final String predicate = (String) result.get(PREDICATE);
+ final String context = (String) result.get(CONTEXT);
+ final Long timestamp = (Long) result.get(TIMESTAMP);
+ RyaType objectRya = null;
+ if (objectType.equalsIgnoreCase(ANYURI.stringValue())){
+ objectRya = new RyaURI(object);
+ }
+ else {
+ objectRya = new RyaType(factory.createURI(objectType), object);
+ }
+
+ final RyaStatement statement;
+ if (!context.isEmpty()){
+ statement = new RyaStatement(new RyaURI(subject), new RyaURI(predicate), objectRya,
+ new RyaURI(context));
+ } else {
+ statement = new RyaStatement(new RyaURI(subject), new RyaURI(predicate), objectRya);
+ }
+
+ if(timestamp != null) {
+ statement.setTimestamp(timestamp);
+ }
+ return statement;
+ }
+
+ @Override
+ public DBObject serialize(final RyaStatement statement){
+ return serializeInternal(statement);
+ }
+
+ public BasicDBObject serializeInternal(final RyaStatement statement){
+ String context = "";
+ if (statement.getContext() != null){
+ context = statement.getContext().getData();
+ }
+ final String id = statement.getSubject().getData() + " " +
+ statement.getPredicate().getData() + " " + statement.getObject().getData() + " " + context;
+ byte[] bytes = id.getBytes();
+ try {
+ final MessageDigest digest = MessageDigest.getInstance("SHA-1");
+ bytes = digest.digest(bytes);
+ } catch (final NoSuchAlgorithmException e) {
+ LOG.error("Unable to perform SHA-1 on the ID, defaulting to raw bytes.", e);
+ }
+ final BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes)))
+ .append(SUBJECT, statement.getSubject().getData())
+ .append(PREDICATE, statement.getPredicate().getData())
+ .append(OBJECT, statement.getObject().getData())
+ .append(OBJECT_TYPE, statement.getObject().getDataType().toString())
+ .append(CONTEXT, context)
+ .append(TIMESTAMP, statement.getTimestamp());
+ return doc;
+
+ }
+
+ @Override
+ public DBObject getQuery(final RyaQuery ryaQuery) {
+ return getQuery(ryaQuery.getQuery());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
new file mode 100644
index 0000000..282ecbb
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/instance/MongoDetailsAdapter.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.mongodb.instance;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map.Entry;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.mongodb.BasicDBList;
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBObject;
+
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
+import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails;
+import mvm.rya.api.instance.RyaDetails.GeoIndexDetails;
+import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import mvm.rya.api.instance.RyaDetails.ProspectorDetails;
+import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails;
+
+/**
+ * Serializes configuration details for use in Mongo.
+ * The {@link DBObject} will look like:
+ * <pre>
+ * {@code
+ * {
+ * "instanceName": <string>,
+ * "version": <string>?,
+ * "entityCentricDetails": <boolean>,
+ * "geoDetails": <boolean>,
+ * "pcjDetails": {
+ * "enabled": <boolean>,
+ * "fluoName": <string>,
+ * "pcjs": [{
+ * "id": <string>,
+ * "updateStrategy": <string>,
+ * "lastUpdate": <date>
+ * },...,{}
+ * ]
+ * },
+ * "temporalDetails": <boolean>,
+ * "freeTextDetails": <boolean>,
+ * "prospectorDetails": <date>,
+ * "joinSelectivityDetails": <date>
+ * }
+ * </pre>
+ */
+@ParametersAreNonnullByDefault
+public class MongoDetailsAdapter {
+ public static final String INSTANCE_KEY = "instanceName";
+ public static final String VERSION_KEY = "version";
+
+ public static final String ENTITY_DETAILS_KEY = "entityCentricDetails";
+ public static final String GEO_DETAILS_KEY = "geoDetails";
+ public static final String PCJ_DETAILS_KEY = "pcjDetails";
+ public static final String PCJ_ENABLED_KEY = "enabled";
+ public static final String PCJ_FLUO_KEY = "fluoName";
+ public static final String PCJ_PCJS_KEY = "pcjs";
+ public static final String PCJ_ID_KEY = "id";
+ public static final String PCJ_UPDATE_STRAT_KEY = "updateStrategy";
+ public static final String PCJ_LAST_UPDATE_KEY = "lastUpdate";
+ public static final String TEMPORAL_DETAILS_KEY = "temporalDetails";
+ public static final String FREETEXT_DETAILS_KEY = "freeTextDetails";
+
+ public static final String PROSPECTOR_DETAILS_KEY = "prospectorDetails";
+ public static final String JOIN_SELECTIVITY_DETAILS_KEY = "joinSelectivitiyDetails";
+
+ /**
+ * Serializes {@link RyaDetails} to mongo {@link DBObject}.
+ * @param details - The details to be serialized.
+ * @return The mongo {@link DBObject}.
+ */
+ public static BasicDBObject toDBObject(final RyaDetails details) {
+ Preconditions.checkNotNull(details);
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start()
+ .add(INSTANCE_KEY, details.getRyaInstanceName())
+ .add(VERSION_KEY, details.getRyaVersion())
+ .add(ENTITY_DETAILS_KEY, details.getEntityCentricIndexDetails().isEnabled())
+ .add(GEO_DETAILS_KEY, details.getGeoIndexDetails().isEnabled())
+ .add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails()))
+ .add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled())
+ .add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled());
+ if(details.getProspectorDetails().getLastUpdated().isPresent()) {
+ builder.add(PROSPECTOR_DETAILS_KEY, details.getProspectorDetails().getLastUpdated().get());
+ }
+ if(details.getJoinSelectivityDetails().getLastUpdated().isPresent()) {
+ builder.add(JOIN_SELECTIVITY_DETAILS_KEY, details.getJoinSelectivityDetails().getLastUpdated().get());
+ }
+ return (BasicDBObject) builder.get();
+ }
+
+ private static DBObject toDBObject(final PCJIndexDetails pcjIndexDetails) {
+ requireNonNull(pcjIndexDetails);
+
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start();
+
+ // Is Enabled
+ builder.add(PCJ_ENABLED_KEY, pcjIndexDetails.isEnabled());
+
+ // Fluo Details if present.
+ if(pcjIndexDetails.getFluoDetails().isPresent()) {
+ builder.add(PCJ_FLUO_KEY, pcjIndexDetails.getFluoDetails().get().getUpdateAppName());
+ }
+
+ // Add the PCJDetail objects.
+ final List<DBObject> pcjDetailsList = new ArrayList<>();
+ for(final PCJDetails pcjDetails : pcjIndexDetails.getPCJDetails().values()) {
+ pcjDetailsList.add( toDBObject( pcjDetails ) );
+ }
+ builder.add(PCJ_PCJS_KEY, pcjDetailsList.toArray());
+
+ return builder.get();
+ }
+
+ static DBObject toDBObject(final PCJDetails pcjDetails) {
+ requireNonNull(pcjDetails);
+
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start();
+
+ // PCJ ID
+ builder.add(PCJ_ID_KEY, pcjDetails.getId());
+
+ // PCJ Update Strategy if present.
+ if(pcjDetails.getUpdateStrategy().isPresent()) {
+ builder.add(PCJ_UPDATE_STRAT_KEY, pcjDetails.getUpdateStrategy().get().name());
+ }
+
+ // Last Update Time if present.
+ if(pcjDetails.getLastUpdateTime().isPresent()) {
+ builder.add(PCJ_LAST_UPDATE_KEY, pcjDetails.getLastUpdateTime().get());
+ }
+
+ return builder.get();
+ }
+
+ public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException {
+ final BasicDBObject basicObj = (BasicDBObject) mongoObj;
+ try {
+ return RyaDetails.builder()
+ .setRyaInstanceName(basicObj.getString(INSTANCE_KEY))
+ .setRyaVersion(basicObj.getString(VERSION_KEY))
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY)))
+ .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY)))
+ .setPCJIndexDetails(getPCJIndexDetails(basicObj))
+ .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY)))
+ .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY)))
+ .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY))))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(basicObj.getDate(JOIN_SELECTIVITY_DETAILS_KEY))))
+ .build();
+ } catch(final Exception e) {
+ throw new MalformedRyaDetailsException("Failed to make RyaDetail from Mongo Object, it is malformed.", e);
+ }
+ }
+
+ private static PCJIndexDetails.Builder getPCJIndexDetails(final BasicDBObject basicObj) {
+ final BasicDBObject pcjIndexDBO = (BasicDBObject) basicObj.get(PCJ_DETAILS_KEY);
+
+ final PCJIndexDetails.Builder pcjBuilder = PCJIndexDetails.builder()
+ .setEnabled(pcjIndexDBO.getBoolean(PCJ_ENABLED_KEY))
+ .setFluoDetails(new FluoDetails(pcjIndexDBO.getString(PCJ_FLUO_KEY)));
+
+ final BasicDBList pcjs = (BasicDBList) pcjIndexDBO.get(PCJ_PCJS_KEY);
+ if(pcjs != null) {
+ for(int ii = 0; ii < pcjs.size(); ii++) {
+ final BasicDBObject pcj = (BasicDBObject) pcjs.get(ii);
+ pcjBuilder.addPCJDetails( toPCJDetails(pcj) );
+ }
+ }
+ return pcjBuilder;
+ }
+
+ static PCJDetails.Builder toPCJDetails(final BasicDBObject dbo) {
+ requireNonNull(dbo);
+
+ // PCJ ID.
+ final PCJDetails.Builder builder = PCJDetails.builder()
+ .setId( dbo.getString(PCJ_ID_KEY) );
+
+ // PCJ Update Strategy if present.
+ if(dbo.containsField(PCJ_UPDATE_STRAT_KEY)) {
+ builder.setUpdateStrategy( PCJUpdateStrategy.valueOf( dbo.getString(PCJ_UPDATE_STRAT_KEY) ) );
+ }
+
+ // Last Update Time if present.
+ if(dbo.containsField(PCJ_LAST_UPDATE_KEY)) {
+ builder.setLastUpdateTime( dbo.getDate(PCJ_LAST_UPDATE_KEY) );
+ }
+
+ return builder;
+ }
+
+ /**
+ * Exception thrown when a MongoDB {@link DBObject} is malformed when attemptin
+ * to adapt it into a {@link RyaDetails}.
+ */
+ public static class MalformedRyaDetailsException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a new {@link MalformedRyaDetailsException}
+ * @param message - The message to be displayed by the exception.
+ * @param e - The source cause of the exception.
+ */
+ public MalformedRyaDetailsException(final String message, final Throwable e) {
+ super(message, e);
+ }
+ }
+}