You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by dl...@apache.org on 2017/08/30 20:31:49 UTC
[11/14] incubator-rya git commit: RYA-324,
RYA-272 Geo refactoring and examples closes #182
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java
new file mode 100644
index 0000000..db7af05
--- /dev/null
+++ b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.accumulo.geo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoIndexer;
+import org.apache.rya.indexing.Md5Hash;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.StatementSerializer;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.GeoTupleSet.GeoSearchFunctionFactory.NearQuery;
+import org.geotools.data.DataStore;
+import org.geotools.data.DataUtilities;
+import org.geotools.data.FeatureSource;
+import org.geotools.data.FeatureStore;
+import org.geotools.factory.CommonFactoryFinder;
+import org.geotools.factory.Hints;
+import org.geotools.feature.DefaultFeatureCollection;
+import org.geotools.feature.SchemaException;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.filter.text.cql2.CQLException;
+import org.geotools.filter.text.ecql.ECQL;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+import org.opengis.filter.FilterFactory;
+import org.opengis.filter.identity.Identifier;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.QueryEvaluationException;
+
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+
+import info.aduna.iteration.CloseableIteration;
+import mil.nga.giat.geowave.adapter.vector.FeatureDataAdapter;
+import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStore;
+import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStoreFactory;
+import mil.nga.giat.geowave.adapter.vector.plugin.GeoWavePluginException;
+import mil.nga.giat.geowave.adapter.vector.query.cql.CQLQuery;
+import mil.nga.giat.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider;
+import mil.nga.giat.geowave.core.store.CloseableIterator;
+import mil.nga.giat.geowave.core.store.StoreFactoryFamilySpi;
+import mil.nga.giat.geowave.core.store.index.PrimaryIndex;
+import mil.nga.giat.geowave.core.store.memory.MemoryStoreFactoryFamily;
+import mil.nga.giat.geowave.core.store.query.EverythingQuery;
+import mil.nga.giat.geowave.core.store.query.QueryOptions;
+import mil.nga.giat.geowave.datastore.accumulo.AccumuloDataStore;
+import mil.nga.giat.geowave.datastore.accumulo.AccumuloStoreFactoryFamily;
+
+/**
+ * A {@link GeoIndexer} wrapper around a GeoWave {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the
+ * RDF Feature Type, and interacts with the Datastore.
+ * <p>
+ * Specifically, this class creates a RDF Feature type and stores each RDF Statement as a RDF Feature in the datastore. Each feature
+ * contains the standard set of GeoWave attributes (Geometry, Start Date, and End Date). The GeoWaveGeoIndexer populates the Geometry
+ * attribute by parsing the Well-Known Text contained in the RDF Statement’s object literal value.
+ * <p>
+ * The RDF Feature contains four additional attributes for each component of the RDF Statement. These attributes are:
+ * <p>
+ * <table border="1">
+ * <tr>
+ * <th>Name</th>
+ * <th>Symbol</th>
+ * <th>Type</th>
+ * </tr>
+ * <tr>
+ * <td>Subject Attribute</td>
+ * <td>S</td>
+ * <td>String</td>
+ * </tr>
+ * </tr>
+ * <tr>
+ * <td>Predicate Attribute</td>
+ * <td>P</td>
+ * <td>String</td>
+ * </tr>
+ * </tr>
+ * <tr>
+ * <td>Object Attribute</td>
+ * <td>O</td>
+ * <td>String</td>
+ * </tr>
+ * </tr>
+ * <tr>
+ * <td>Context Attribute</td>
+ * <td>C</td>
+ * <td>String</td>
+ * </tr>
+ * </table>
+ */
+public class GeoWaveGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer {
+
+ private static final String TABLE_SUFFIX = "geo";
+
+ private static final Logger logger = Logger.getLogger(GeoWaveGeoIndexer.class);
+
+ private static final String FEATURE_NAME = "RDF";
+
+ private static final String SUBJECT_ATTRIBUTE = "S";
+ private static final String PREDICATE_ATTRIBUTE = "P";
+ private static final String OBJECT_ATTRIBUTE = "O";
+ private static final String CONTEXT_ATTRIBUTE = "C";
+ private static final String GEO_ID_ATTRIBUTE = "geo_id";
+ private static final String GEOMETRY_ATTRIBUTE = "geowave_index_geometry";
+
+ private Set<URI> validPredicates;
+ private Configuration conf;
+ private FeatureStore<SimpleFeatureType, SimpleFeature> featureStore;
+ private FeatureSource<SimpleFeatureType, SimpleFeature> featureSource;
+ private SimpleFeatureType featureType;
+ private FeatureDataAdapter featureDataAdapter;
+ private DataStore geoToolsDataStore;
+ private mil.nga.giat.geowave.core.store.DataStore geoWaveDataStore;
+ private final PrimaryIndex index = new SpatialDimensionalityTypeProvider().createPrimaryIndex();
+ private boolean isInit = false;
+
+ //initialization occurs in setConf because index is created using reflection
+ @Override
+ public void setConf(final Configuration conf) {
+ this.conf = conf;
+ if (!isInit) {
+ try {
+ initInternal();
+ isInit = true;
+ } catch (final IOException e) {
+ logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * @return the internal GeoTools{@link DataStore} used by the {@link GeoWaveGeoIndexer}.
+ */
+ public DataStore getGeoToolsDataStore() {
+ return geoToolsDataStore;
+ }
+
+ /**
+ * @return the internal GeoWave {@link DataStore} used by the {@link GeoWaveGeoIndexer}.
+ */
+ public mil.nga.giat.geowave.core.store.DataStore getGeoWaveDataStore() {
+ return geoWaveDataStore;
+ }
+
+ private void initInternal() throws IOException {
+ validPredicates = ConfigUtils.getGeoPredicates(conf);
+
+ try {
+ geoToolsDataStore = createDataStore(conf);
+ geoWaveDataStore = ((GeoWaveGTDataStore) geoToolsDataStore).getDataStore();
+ } catch (final GeoWavePluginException e) {
+ logger.error("Failed to create GeoWave data store", e);
+ }
+
+ try {
+ featureType = getStatementFeatureType(geoToolsDataStore);
+ } catch (final IOException | SchemaException e) {
+ throw new IOException(e);
+ }
+
+ featureDataAdapter = new FeatureDataAdapter(featureType);
+
+ featureSource = geoToolsDataStore.getFeatureSource(featureType.getName());
+ if (!(featureSource instanceof FeatureStore)) {
+ throw new IllegalStateException("Could not retrieve feature store");
+ }
+ featureStore = (FeatureStore<SimpleFeatureType, SimpleFeature>) featureSource;
+ }
+
+ public Map<String, Serializable> getParams(final Configuration conf) {
+ // get the configuration parameters
+ final Instance instance = ConfigUtils.getInstance(conf);
+ final String instanceId = instance.getInstanceName();
+ final String zookeepers = instance.getZooKeepers();
+ final String user = ConfigUtils.getUsername(conf);
+ final String password = ConfigUtils.getPassword(conf);
+ final String auths = ConfigUtils.getAuthorizations(conf).toString();
+ final String tableName = getTableName(conf);
+ final String tablePrefix = ConfigUtils.getTablePrefix(conf);
+
+ final Map<String, Serializable> params = new HashMap<>();
+ params.put("zookeeper", zookeepers);
+ params.put("instance", instanceId);
+ params.put("user", user);
+ params.put("password", password);
+ params.put("namespace", tableName);
+ params.put("gwNamespace", tablePrefix + getClass().getSimpleName());
+
+ params.put("Lock Management", LockManagementType.MEMORY.toString());
+ params.put("Authorization Management Provider", AuthorizationManagementProviderType.EMPTY.toString());
+ params.put("Authorization Data URL", null);
+ params.put("Transaction Buffer Size", 10000);
+ params.put("Query Index Strategy", QueryIndexStrategyType.HEURISTIC_MATCH.toString());
+ return params;
+ }
+
+ /**
+ * Creates the {@link DataStore} for the {@link GeoWaveGeoIndexer}.
+ * @param conf the {@link Configuration}.
+ * @return the {@link DataStore}.
+ */
+ public DataStore createDataStore(final Configuration conf) throws IOException, GeoWavePluginException {
+ final Map<String, Serializable> params = getParams(conf);
+ final Instance instance = ConfigUtils.getInstance(conf);
+ final boolean useMock = instance instanceof MockInstance;
+
+ final StoreFactoryFamilySpi storeFactoryFamily;
+ if (useMock) {
+ storeFactoryFamily = new MemoryStoreFactoryFamily();
+ } else {
+ storeFactoryFamily = new AccumuloStoreFactoryFamily();
+ }
+
+ final GeoWaveGTDataStoreFactory geoWaveGTDataStoreFactory = new GeoWaveGTDataStoreFactory(storeFactoryFamily);
+ final DataStore dataStore = geoWaveGTDataStoreFactory.createNewDataStore(params);
+
+ return dataStore;
+ }
+
+ private static SimpleFeatureType getStatementFeatureType(final DataStore dataStore) throws IOException, SchemaException {
+ SimpleFeatureType featureType;
+
+ final String[] datastoreFeatures = dataStore.getTypeNames();
+ if (Arrays.asList(datastoreFeatures).contains(FEATURE_NAME)) {
+ featureType = dataStore.getSchema(FEATURE_NAME);
+ } else {
+ featureType = DataUtilities.createType(FEATURE_NAME,
+ SUBJECT_ATTRIBUTE + ":String," +
+ PREDICATE_ATTRIBUTE + ":String," +
+ OBJECT_ATTRIBUTE + ":String," +
+ CONTEXT_ATTRIBUTE + ":String," +
+ GEOMETRY_ATTRIBUTE + ":Geometry:srid=4326," +
+ GEO_ID_ATTRIBUTE + ":String");
+
+ dataStore.createSchema(featureType);
+ }
+ return featureType;
+ }
+
+ @Override
+ public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException {
+ // create a feature collection
+ final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection();
+ for (final RyaStatement ryaStatement : ryaStatements) {
+ final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+ // if the predicate list is empty, accept all predicates.
+ // Otherwise, make sure the predicate is on the "valid" list
+ final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
+
+ if (isValidPredicate && (statement.getObject() instanceof Literal)) {
+ try {
+ final SimpleFeature feature = createFeature(featureType, statement);
+ featureCollection.add(feature);
+ } catch (final ParseException e) {
+ logger.warn("Error getting geo from statement: " + statement.toString(), e);
+ }
+ }
+ }
+
+ // write this feature collection to the store
+ if (!featureCollection.isEmpty()) {
+ featureStore.addFeatures(featureCollection);
+ }
+ }
+
+ @Override
+ public void storeStatement(final RyaStatement statement) throws IOException {
+ storeStatements(Collections.singleton(statement));
+ }
+
+ private static SimpleFeature createFeature(final SimpleFeatureType featureType, final Statement statement) throws ParseException {
+ final String subject = StatementSerializer.writeSubject(statement);
+ final String predicate = StatementSerializer.writePredicate(statement);
+ final String object = StatementSerializer.writeObject(statement);
+ final String context = StatementSerializer.writeContext(statement);
+
+ // create the feature
+ final Object[] noValues = {};
+
+ // create the hash
+ final String statementId = Md5Hash.md5Base64(StatementSerializer.writeStatement(statement));
+ final SimpleFeature newFeature = SimpleFeatureBuilder.build(featureType, noValues, statementId);
+
+ // write the statement data to the fields
+ final Geometry geom = GeoParseUtils.getGeometry(statement, new GmlParser());
+ if(geom == null || geom.isEmpty() || !geom.isValid()) {
+ throw new ParseException("Could not create geometry for statement " + statement);
+ }
+ newFeature.setDefaultGeometry(geom);
+
+ newFeature.setAttribute(SUBJECT_ATTRIBUTE, subject);
+ newFeature.setAttribute(PREDICATE_ATTRIBUTE, predicate);
+ newFeature.setAttribute(OBJECT_ATTRIBUTE, object);
+ newFeature.setAttribute(CONTEXT_ATTRIBUTE, context);
+ // GeoWave does not support querying based on a user generated feature ID
+ // So, we create a separate ID attribute that it can query on.
+ newFeature.setAttribute(GEO_ID_ATTRIBUTE, statementId);
+
+ // preserve the ID that we created for this feature
+ // (set the hint to FALSE to have GeoTools generate IDs)
+ newFeature.getUserData().put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE);
+
+ return newFeature;
+ }
+
+ private CloseableIteration<Statement, QueryEvaluationException> performQuery(final String type, final Geometry geometry,
+ final StatementConstraints contraints) {
+ final List<String> filterParms = new ArrayList<String>();
+
+ filterParms.add(type + "(" + GEOMETRY_ATTRIBUTE + ", " + geometry + " )");
+
+ if (contraints.hasSubject()) {
+ filterParms.add("( " + SUBJECT_ATTRIBUTE + "= '" + contraints.getSubject() + "') ");
+ }
+ if (contraints.hasContext()) {
+ filterParms.add("( " + CONTEXT_ATTRIBUTE + "= '" + contraints.getContext() + "') ");
+ }
+ if (contraints.hasPredicates()) {
+ final List<String> predicates = new ArrayList<String>();
+ for (final URI u : contraints.getPredicates()) {
+ predicates.add("( " + PREDICATE_ATTRIBUTE + "= '" + u.stringValue() + "') ");
+ }
+ filterParms.add("(" + StringUtils.join(predicates, " OR ") + ")");
+ }
+
+ final String filterString = StringUtils.join(filterParms, " AND ");
+ logger.info("Performing geowave query : " + filterString);
+
+ return getIteratorWrapper(filterString);
+ }
+
+ private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final String filterString) {
+
+ return new CloseableIteration<Statement, QueryEvaluationException>() {
+
+ private CloseableIterator<SimpleFeature> featureIterator = null;
+
+ CloseableIterator<SimpleFeature> getIterator() throws QueryEvaluationException {
+ if (featureIterator == null) {
+ Filter cqlFilter;
+ try {
+ cqlFilter = ECQL.toFilter(filterString);
+ } catch (final CQLException e) {
+ logger.error("Error parsing query: " + filterString, e);
+ throw new QueryEvaluationException(e);
+ }
+
+ final CQLQuery cqlQuery = new CQLQuery(null, cqlFilter, featureDataAdapter);
+ final QueryOptions queryOptions = new QueryOptions(featureDataAdapter, index);
+
+ try {
+ featureIterator = geoWaveDataStore.query(queryOptions, cqlQuery);
+ } catch (final Exception e) {
+ logger.error("Error performing query: " + filterString, e);
+ throw new QueryEvaluationException(e);
+ }
+ }
+ return featureIterator;
+ }
+
+ @Override
+ public boolean hasNext() throws QueryEvaluationException {
+ return getIterator().hasNext();
+ }
+
+ @Override
+ public Statement next() throws QueryEvaluationException {
+ final SimpleFeature feature = getIterator().next();
+ final String subjectString = feature.getAttribute(SUBJECT_ATTRIBUTE).toString();
+ final String predicateString = feature.getAttribute(PREDICATE_ATTRIBUTE).toString();
+ final String objectString = feature.getAttribute(OBJECT_ATTRIBUTE).toString();
+ final Object context = feature.getAttribute(CONTEXT_ATTRIBUTE);
+ final String contextString = context != null ? context.toString() : "";
+ final Statement statement = StatementSerializer.readStatement(subjectString, predicateString, objectString, contextString);
+ return statement;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove not implemented");
+ }
+
+ @Override
+ public void close() throws QueryEvaluationException {
+ try {
+ getIterator().close();
+ } catch (final IOException e) {
+ throw new QueryEvaluationException(e);
+ }
+ }
+ };
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryEquals(final Geometry query, final StatementConstraints contraints) {
+ return performQuery("EQUALS", query, contraints);
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(final Geometry query, final StatementConstraints contraints) {
+ return performQuery("DISJOINT", query, contraints);
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(final Geometry query, final StatementConstraints contraints) {
+ return performQuery("INTERSECTS", query, contraints);
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryTouches(final Geometry query, final StatementConstraints contraints) {
+ return performQuery("TOUCHES", query, contraints);
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(final Geometry query, final StatementConstraints contraints) {
+ return performQuery("CROSSES", query, contraints);
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryWithin(final Geometry query, final StatementConstraints contraints) {
+ return performQuery("WITHIN", query, contraints);
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryContains(final Geometry query, final StatementConstraints contraints) {
+ return performQuery("CONTAINS", query, contraints);
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(final Geometry query, final StatementConstraints contraints) {
+ return performQuery("OVERLAPS", query, contraints);
+ }
+
+ @Override
+ public CloseableIteration<Statement, QueryEvaluationException> queryNear(final NearQuery query,
+ final StatementConstraints contraints) {
+ throw new UnsupportedOperationException("Near queries are not supported in Accumulo.");
+ }
+
+ @Override
+ public Set<URI> getIndexablePredicates() {
+ return validPredicates;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ // TODO cache and flush features instead of writing them one at a time
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ }
+
+
+ @Override
+ public String getTableName() {
+ return getTableName(conf);
+ }
+
+ /**
+ * Get the Accumulo table that will be used by this index.
+ * @param conf
+ * @return table name guaranteed to be used by instances of this index
+ */
+ public static String getTableName(final Configuration conf) {
+ return makeTableName( ConfigUtils.getTablePrefix(conf) );
+ }
+
+ /**
+ * Make the Accumulo table name used by this indexer for a specific instance of Rya.
+ *
+ * @param ryaInstanceName - The name of the Rya instance the table name is for. (not null)
+ * @return The Accumulo table name used by this indexer for a specific instance of Rya.
+ */
+ public static String makeTableName(final String ryaInstanceName) {
+ requireNonNull(ryaInstanceName);
+ return ryaInstanceName + TABLE_SUFFIX;
+ }
+
+ private void deleteStatements(final Collection<RyaStatement> ryaStatements) throws IOException {
+ // create a feature collection
+ final DefaultFeatureCollection featureCollection = new DefaultFeatureCollection();
+
+ for (final RyaStatement ryaStatement : ryaStatements) {
+ final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+ // if the predicate list is empty, accept all predicates.
+ // Otherwise, make sure the predicate is on the "valid" list
+ final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
+
+ if (isValidPredicate && (statement.getObject() instanceof Literal)) {
+ try {
+ final SimpleFeature feature = createFeature(featureType, statement);
+ featureCollection.add(feature);
+ } catch (final ParseException e) {
+ logger.warn("Error getting geo from statement: " + statement.toString(), e);
+ }
+ }
+ }
+
+ // remove this feature collection from the store
+ if (!featureCollection.isEmpty()) {
+ final Set<Identifier> featureIds = new HashSet<Identifier>();
+ final FilterFactory filterFactory = CommonFactoryFinder.getFilterFactory(null);
+ final Set<String> stringIds = DataUtilities.fidSet(featureCollection);
+ for (final String id : stringIds) {
+ featureIds.add(filterFactory.featureId(id));
+ }
+
+ final String filterString = stringIds.stream().collect(Collectors.joining("','", "'", "'"));
+
+ Filter filter = null;
+ try {
+ filter = ECQL.toFilter(GEO_ID_ATTRIBUTE + " IN (" + filterString + ")", filterFactory);
+ } catch (final CQLException e) {
+ logger.error("Unable to generate filter for deleting the statement.", e);
+ }
+
+ featureStore.removeFeatures(filter);
+ }
+ }
+
+
+ @Override
+ public void deleteStatement(final RyaStatement statement) throws IOException {
+ deleteStatements(Collections.singleton(statement));
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public void setConnector(final Connector connector) {
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ @Override
+ public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+ // delete existing data
+ geoWaveDataStore.delete(new QueryOptions(), new EverythingQuery());
+ }
+
+ @Override
+ public void dropAndDestroy() {
+ }
+
+ /**
+ * The list of supported Geo Wave {@code LockingManagementFactory} types.
+ */
+ private static enum LockManagementType {
+ MEMORY("memory");
+
+ private final String name;
+
+ /**
+ * Creates a new {@link LockManagementType}.
+ * @param name the name of the type. (not {@code null})
+ */
+ private LockManagementType(final String name) {
+ this.name = checkNotNull(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ /**
+ * The list of supported Geo Wave {@code AuthorizationFactorySPI } types.
+ */
+ private static enum AuthorizationManagementProviderType {
+ EMPTY("empty"),
+ JSON_FILE("jsonFile");
+
+ private final String name;
+
+ /**
+ * Creates a new {@link AuthorizationManagementProviderType}.
+ * @param name the name of the type. (not {@code null})
+ */
+ private AuthorizationManagementProviderType(final String name) {
+ this.name = checkNotNull(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ /**
+ * The list of supported Geo Wave {@code IndexQueryStrategySPI} types.
+ */
+ private static enum QueryIndexStrategyType {
+ BEST_MATCH("Best Match"),
+ HEURISTIC_MATCH("Heuristic Match"),
+ PRESERVE_LOCALITY("Preserve Locality");
+
+ private final String name;
+
+ /**
+ * Creates a new {@link QueryIndexStrategyType}.
+ * @param name the name of the type. (not {@code null})
+ */
+ private QueryIndexStrategyType(final String name) {
+ this.name = checkNotNull(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GmlParser.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GmlParser.java b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GmlParser.java
new file mode 100644
index 0000000..af72b3a
--- /dev/null
+++ b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/accumulo/geo/GmlParser.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.accumulo.geo;
+
+import java.io.IOException;
+import java.io.Reader;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils.GmlToGeometryParser;
+import org.geotools.gml3.GMLConfiguration;
+import org.xml.sax.SAXException;
+
+import com.vividsolutions.jts.geom.Geometry;
+
+
+/**
+ * This wraps geotools parser for rya.geoCommon that cannot be dependent on geotools.
+ *
+ */
+public class GmlParser implements GmlToGeometryParser {
+
+ /* (non-Javadoc)
+ * @see org.apache.rya.indexing.accumulo.geo.GeoParseUtils.GmlToGeometryParser#parse(java.io.Reader)
+ */
+ @Override
+ public Geometry parse(Reader reader) throws IOException, SAXException, ParserConfigurationException {
+ final org.geotools.xml.Parser gmlParser = new org.geotools.xml.Parser(new GMLConfiguration());
+ return (Geometry) gmlParser.parse(reader);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/geoExamples/GeowaveDirectExample.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/geoExamples/GeowaveDirectExample.java b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/geoExamples/GeowaveDirectExample.java
new file mode 100644
index 0000000..33a4bec
--- /dev/null
+++ b/extras/rya.geoindexing/geo.geowave/src/main/java/org/apache/rya/indexing/geoExamples/GeowaveDirectExample.java
@@ -0,0 +1,445 @@
+package org.apache.rya.indexing.geoExamples;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.commons.lang.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.GeoIndexerType;
+import org.apache.rya.indexing.GeoRyaSailFactory;
+import org.apache.rya.indexing.accumulo.AccumuloIndexingConfiguration;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.OptionalConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.QueryResultHandlerException;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResultHandler;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.query.Update;
+import org.openrdf.query.UpdateExecutionException;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+
+public class GeowaveDirectExample {
+ private static final Logger log = Logger.getLogger(GeowaveDirectExample.class);
+
+ //
+ // Connection configuration parameters
+ //
+
+ private static final boolean USE_MOCK_INSTANCE = true;
+ private static final boolean PRINT_QUERIES = true;
+ private static final String INSTANCE = "instance";
+ private static final String RYA_TABLE_PREFIX = "x_test_triplestore_";
+ private static final String AUTHS = "U";
+
+ public static void main(final String[] args) throws Exception {
+ final Configuration conf = getConf();
+ conf.set(PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.name());
+ conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, PRINT_QUERIES);
+ conf.setBoolean(OptionalConfigUtils.USE_GEO, true);
+ conf.setEnum(OptionalConfigUtils.GEO_INDEXER_TYPE, GeoIndexerType.GEO_WAVE);
+
+ log.info("Creating the tables as root.");
+
+ SailRepository repository = null;
+ SailRepositoryConnection conn = null;
+
+ try {
+ log.info("Connecting to Geo Sail Repository.");
+ final Sail extSail = GeoRyaSailFactory.getInstance(conf);
+ repository = new SailRepository(extSail);
+ conn = repository.getConnection();
+
+ final long start = System.currentTimeMillis();
+ log.info("Running SPARQL Example: Add Point and Geo Search with PCJ");
+ testAddPointAndWithinSearchWithPCJ(conn);
+ log.info("Running SPARQL Example: Temporal, Freetext, and Geo Search");
+ testTemporalFreeGeoSearch(conn);
+ log.info("Running SPARQL Example: Geo, Freetext, and PCJ Search");
+ testGeoFreetextWithPCJSearch(conn);
+ log.info("Running SPARQL Example: Delete Geo Data");
+ testDeleteGeoData(conn);
+
+ log.info("TIME: " + (System.currentTimeMillis() - start) / 1000.);
+ } finally {
+ log.info("Shutting down");
+ closeQuietly(conn);
+ closeQuietly(repository);
+ }
+ }
+
+ private static void closeQuietly(final SailRepository repository) {
+ if (repository != null) {
+ try {
+ repository.shutDown();
+ } catch (final RepositoryException e) {
+ // quietly absorb this exception
+ }
+ }
+ }
+
+ private static void closeQuietly(final SailRepositoryConnection conn) {
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (final RepositoryException e) {
+ // quietly absorb this exception
+ }
+ }
+ }
+
+ private static Configuration getConf() {
+
+
+ return AccumuloIndexingConfiguration.builder()
+ .setUseMockAccumulo(USE_MOCK_INSTANCE)
+ .setAuths(AUTHS)
+ .setAccumuloUser("root")
+ .setAccumuloPassword("")
+ .setAccumuloInstance(INSTANCE)
+ .setRyaPrefix(RYA_TABLE_PREFIX)
+ .setUsePcj(true)
+ .setUseAccumuloFreetextIndex(true)
+ .setUseAccumuloTemporalIndex(true)
+ .build();
+
+ }
+
+
+
+ private static void testAddPointAndWithinSearchWithPCJ(
+ final SailRepositoryConnection conn) throws Exception {
+
+ final String update = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "INSERT DATA { " //
+ + " <urn:feature> a geo:Feature ; " //
+ + " geo:hasGeometry [ " //
+ + " a geo:Point ; " //
+ + " geo:asWKT \"Point(-77.03524 38.889468)\"^^geo:wktLiteral "//
+ + " ] . " //
+ + "}";
+
+ final Update u = conn.prepareUpdate(QueryLanguage.SPARQL, update);
+ u.execute();
+
+ String queryString;
+ TupleQuery tupleQuery;
+ CountingResultHandler tupleHandler;
+
+ // point outside search ring
+ queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "SELECT ?feature ?point ?wkt " //
+ + "{" //
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-77 39, -76 39, -76 38, -77 38, -77 39))\"^^geo:wktLiteral)) " //
+ + "}";//
+ tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+ tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("point outside search ring, Result count : " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() == 0);
+ // point inside search ring
+ queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "SELECT ?feature ?point ?wkt " // ?e ?l ?o" //
+ + "{" //
+// + " ?feature a ?e . "//
+// + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
+// + " ?e <uri:talksTo> ?o . "//
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)) " //
+ + "}";//
+
+ tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+ tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("point inside search ring, Result count : " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() == 1);
+
+ // point inside search ring with Pre-Computed Join
+ queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "SELECT ?feature ?point ?wkt "//?e ?l ?o" //
+ + "{" //
+// + " ?feature a ?e . "//
+// + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
+// + " ?e <uri:talksTo> ?o . "//
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)) " //
+ + "}";//
+
+ tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+ tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("point inside search ring with Pre-Computed Join, Result count : " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() >= 1); // may see points from
+ // during previous runs
+
+ // point outside search ring with PCJ
+ queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "SELECT ?feature ?point ?wkt "//?e ?l ?o " //
+ + "{" //
+// + " ?feature a ?e . "//
+// + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
+// + " ?e <uri:talksTo> ?o . "//
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-77 39, -76 39, -76 38, -77 38, -77 39))\"^^geo:wktLiteral)) " //
+ + "}";//
+ tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+ tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("point outside search ring with PCJ, Result count : " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() == 0);
+
+ // point inside search ring with different Pre-Computed Join
+ queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "SELECT ?feature ?point "//?wkt ?e ?c ?l ?o " //
+ + "{" //
+// + " ?e a ?c . "//
+// + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
+ //+ " ?e <uri:talksTo> ?o . "//
+ //+ " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)) " //
+ + "}";//
+ tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+ tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("point inside search ring with different Pre-Computed Join, Result count : " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() == 1);
+ }
+
+ private static void testTemporalFreeGeoSearch(
+ final SailRepositoryConnection conn)
+ throws MalformedQueryException, RepositoryException,
+ UpdateExecutionException, TupleQueryResultHandlerException,
+ QueryEvaluationException {
+ // Once upon a time, a meeting happened, in a place and time, attended by 5 paladins and another.
+ final String update = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "PREFIX time: <http://www.w3.org/2006/time#> "//
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> "//
+ + "PREFIX fts: <http://rdf.useekm.com/fts#> "//
+ + "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> "//
+ + "PREFIX ex: <http://example.com/#> "//
+ + "INSERT DATA { " //
+ + " ex:feature719 a geo:Feature ; " //
+ + " geo:hasGeometry [ " //
+ + " a geo:Point ; " //
+ + " geo:asWKT \"Point(-77.03524 38.889468)\"^^geo:wktLiteral "//
+ + " ] . "//
+ + " ex:event719 a time:Instant ;" //
+ + " time:inXSDDateTime '2001-01-01T01:01:04-08:00' ;" // 4 seconds
+ + " ex:locatedAt ex:feature719 ;" //
+ + " ex:attendee ex:person01;" //
+ + " ex:attendee ex:person02;" //
+ + " ex:attendee ex:person03;" //
+ + " ex:attendee [a ex:Person ; rdfs:label 'Paladin Ogier the Dane' ] ;" // Use a blank node instead of person04
+ + " ex:attendee ex:person05;" //
+ + " ex:attendee ex:person06." //
+ + " ex:person01 a ex:Person ;" //
+ + " rdfs:label \"Paladin Fossil\"." //
+ + " ex:person02 a ex:Person ;" //
+ + " rdfs:label \"Paladin Paul Denning\"." //
+ + " ex:person03 a ex:Person ;" //
+ + " rdfs:label 'Paladin Will Travel'." //
+ + " ex:person05 a ex:Person ;" //
+ + " rdfs:label 'Paladin dimethyl disulfide'." //
+ + " ex:person06 a ex:Person ;" //
+ + " rdfs:label 'Ignore me'." //
+ + "" //
+ + "}";
+ final Update u = conn.prepareUpdate(QueryLanguage.SPARQL, update);
+ u.execute();
+
+ String queryString;
+ TupleQuery tupleQuery;
+ CountingResultHandler tupleHandler;
+
+ // Find all events after a time, located in a polygon square, whose attendees have label names beginning with "Pal"
+ queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "PREFIX time: <http://www.w3.org/2006/time#> "//
+ + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> "//
+ + "PREFIX fts: <http://rdf.useekm.com/fts#> "//
+ + "PREFIX ex: <http://example.com/#> "//
+ + "SELECT ?feature ?point ?wkt ?event ?time ?person ?match" //
+ + "{" //
+ + " ?event a time:Instant ; \n"//
+ + " time:inXSDDateTime ?time ; \n"//
+ + " ex:locatedAt ?feature ;" //
+ + " ex:attendee ?person." //
+ + " FILTER(tempo:after(?time, '2001-01-01T01:01:03-08:00') ) \n"// after 3 seconds
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)). " //
+ + " ?person a ex:Person . "//
+ + " ?person <http://www.w3.org/2000/01/rdf-schema#label> ?match . "//
+ + " FILTER(fts:text(?match, \"Pal*\")) " //
+ + "}";//
+
+ tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+
+ tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("Result count : " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() == 5 );
+
+ }
+
+ private static void testGeoFreetextWithPCJSearch(
+ final SailRepositoryConnection conn)
+ throws MalformedQueryException, RepositoryException,
+ TupleQueryResultHandlerException, QueryEvaluationException {
+ // ring outside point
+ final String queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX fts: <http://rdf.useekm.com/fts#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "SELECT ?feature ?point ?wkt ?e ?c ?l ?o ?person ?match " //
+ + "{" //
+ + " ?person a <http://example.org/ontology/Person> . "//
+ + " ?person <http://www.w3.org/2000/01/rdf-schema#label> ?match . "//
+ + " FILTER(fts:text(?match, \"!alice & hose\")) " //
+ + " ?e a ?c . "//
+ + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "//
+ + " ?e <uri:talksTo> ?o . "//
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)) " //
+ + "}";//
+ final TupleQuery tupleQuery = conn.prepareTupleQuery(
+ QueryLanguage.SPARQL, queryString);
+ final CountingResultHandler tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("Result count : " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() == 0);// TODO ==1 some data is missing for this query!
+ }
+
+
+
+ private static void testDeleteGeoData(final SailRepositoryConnection conn)
+ throws Exception {
+ // Delete all stored points
+ final String sparqlDelete = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "DELETE {\n" //
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + "}\n" + "WHERE { \n" + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + "}";//
+
+ final Update deleteUpdate = conn.prepareUpdate(QueryLanguage.SPARQL,
+ sparqlDelete);
+ deleteUpdate.execute();
+
+ String queryString;
+ TupleQuery tupleQuery;
+ CountingResultHandler tupleHandler;
+
+ // Find all stored points
+ queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "SELECT ?feature ?point ?wkt " //
+ + "{" //
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + "}";//
+ tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+ tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("Result count : " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() == 0);
+ }
+
+ private static class CountingResultHandler implements
+ TupleQueryResultHandler {
+ private int count = 0;
+
+ public int getCount() {
+ return count;
+ }
+
+ public void resetCount() {
+ count = 0;
+ }
+
+ @Override
+ public void startQueryResult(final List<String> arg0)
+ throws TupleQueryResultHandlerException {
+ }
+
+ @Override
+ public void handleSolution(final BindingSet arg0)
+ throws TupleQueryResultHandlerException {
+ count++;
+ System.out.println(arg0);
+ }
+
+ @Override
+ public void endQueryResult() throws TupleQueryResultHandlerException {
+ }
+
+ @Override
+ public void handleBoolean(final boolean arg0)
+ throws QueryResultHandlerException {
+ }
+
+ @Override
+ public void handleLinks(final List<String> arg0)
+ throws QueryResultHandlerException {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveFeatureReaderTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveFeatureReaderTest.java b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveFeatureReaderTest.java
new file mode 100644
index 0000000..f36a515
--- /dev/null
+++ b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveFeatureReaderTest.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.accumulo.geo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.indexing.GeoIndexerType;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.geotools.data.DataStore;
+import org.geotools.data.DataUtilities;
+import org.geotools.data.DefaultTransaction;
+import org.geotools.data.DelegatingFeatureReader;
+import org.geotools.data.FeatureReader;
+import org.geotools.data.FeatureWriter;
+import org.geotools.data.Query;
+import org.geotools.data.Transaction;
+import org.geotools.feature.SchemaException;
+import org.geotools.feature.visitor.MaxVisitor;
+import org.geotools.feature.visitor.MinVisitor;
+import org.geotools.filter.FilterFactoryImpl;
+import org.geotools.filter.text.cql2.CQLException;
+import org.geotools.filter.text.ecql.ECQL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+
+import com.google.common.collect.Sets;
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.PrecisionModel;
+
+import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveFeatureReader;
+import mil.nga.giat.geowave.adapter.vector.utils.DateUtilities;
+
+/**
+ * Tests the {@link FeatureReader} capabilities within the
+ * {@link GeoWaveGeoIndexer).
+ */
+public class GeoWaveFeatureReaderTest {
+ private DataStore dataStore;
+ private SimpleFeatureType type;
+ private final GeometryFactory factory = new GeometryFactory(new PrecisionModel(PrecisionModel.FIXED));
+ private Query query = null;
+ private final List<String> fids = new ArrayList<>();
+ private final List<String> pids = new ArrayList<>();
+ private Date stime, etime;
+
+ private AccumuloRdfConfiguration conf;
+
+ private void setupConf() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix("triplestore_");
+ final String tableName = GeoWaveGeoIndexer.getTableName(conf);
+ conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true);
+ conf.set(ConfigUtils.CLOUDBASE_USER, "USERNAME");
+ conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "PASS");
+ conf.set(ConfigUtils.CLOUDBASE_INSTANCE, "INSTANCE");
+ conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, "localhost");
+ conf.set(ConfigUtils.CLOUDBASE_AUTHS, "U");
+ conf.set(OptionalConfigUtils.USE_GEO, "true");
+ conf.set(OptionalConfigUtils.GEO_INDEXER_TYPE, GeoIndexerType.GEO_WAVE.toString());
+
+ final TableOperations tops = ConfigUtils.getConnector(conf).tableOperations();
+ // get all of the table names with the prefix
+ final Set<String> toDel = Sets.newHashSet();
+ for (final String t : tops.list()) {
+ if (t.startsWith(tableName)) {
+ toDel.add(t);
+ }
+ }
+ for (final String t : toDel) {
+ tops.delete(t);
+ }
+ }
+
+ @Before
+ public void setup() throws SchemaException, CQLException, Exception {
+ setupConf();
+ try (final GeoWaveGeoIndexer indexer = new GeoWaveGeoIndexer()) {
+ indexer.setConf(conf);
+ dataStore = indexer.getGeoToolsDataStore();
+ // Clear old data
+ indexer.purge(conf);
+
+ type = DataUtilities.createType(
+ "GeoWaveFeatureReaderTest",
+ "geometry:Geometry:srid=4326,start:Date,end:Date,pop:java.lang.Long,pid:String");
+
+ dataStore.createSchema(type);
+
+ stime = DateUtilities.parseISO("2005-05-15T20:32:56Z");
+ etime = DateUtilities.parseISO("2005-05-20T20:32:56Z");
+
+ final Transaction transaction1 = new DefaultTransaction();
+ final FeatureWriter<SimpleFeatureType, SimpleFeature> writer = dataStore.getFeatureWriter(
+ type.getTypeName(),
+ transaction1);
+ assertFalse(writer.hasNext());
+ SimpleFeature newFeature = writer.next();
+ newFeature.setAttribute(
+ "pop",
+ Long.valueOf(100));
+ newFeature.setAttribute(
+ "pid",
+ "a" + UUID.randomUUID().toString());
+ newFeature.setAttribute(
+ "start",
+ stime);
+ newFeature.setAttribute(
+ "end",
+ etime);
+ newFeature.setAttribute(
+ "geometry",
+ factory.createPoint(new Coordinate(27.25, 41.25)));
+ fids.add(newFeature.getID());
+ pids.add(newFeature.getAttribute("pid").toString());
+ writer.write();
+ newFeature = writer.next();
+ newFeature.setAttribute(
+ "pop",
+ Long.valueOf(101));
+ newFeature.setAttribute(
+ "pid",
+ "b" + UUID.randomUUID().toString());
+ newFeature.setAttribute(
+ "start",
+ etime);
+ newFeature.setAttribute(
+ "geometry",
+ factory.createPoint(new Coordinate(28.25, 41.25)));
+ fids.add(newFeature.getID());
+ pids.add(newFeature.getAttribute("pid").toString());
+ writer.write();
+ writer.close();
+ transaction1.commit();
+ transaction1.close();
+
+ query = new Query(
+ "GeoWaveFeatureReaderTest",
+ ECQL.toFilter("IN ('" + fids.get(0) + "')"),
+ new String[] {
+ "geometry",
+ "pid"
+ });
+ }
+ }
+
+ @After
+ public void tearDown() {
+ dataStore.dispose();
+ }
+
+ @Test
+ public void testFID() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException {
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ int count = 0;
+ while (reader.hasNext()) {
+ final SimpleFeature feature = reader.next();
+ assertTrue(fids.contains(feature.getID()));
+ count++;
+ }
+ assertTrue(count > 0);
+ }
+
+ @Test
+ public void testFidFilterQuery() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException {
+ final String fidsString = fids.stream().collect(Collectors.joining("','", "'", "'"));
+ final Filter filter = ECQL.toFilter("IN (" + fidsString + ")");
+ final Query query = new Query(
+ "GeoWaveFeatureReaderTest",
+ filter,
+ new String[] {
+ "geometry",
+ "pid"
+ });
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ int count = 0;
+ while (reader.hasNext()) {
+ final SimpleFeature feature = reader.next();
+ assertTrue(fids.contains(feature.getID()));
+ count++;
+ }
+ assertTrue(count == fids.size());
+ }
+
+ @Test
+ public void testPidFilterQuery() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException {
+ // Filter it so that it only queries for everything but the first pid.
+ // There's only 2 pids total so it should just return the second one.
+ final String pidsString = pids.subList(1, pids.size()).stream().collect(Collectors.joining("','", "'", "'"));
+ final Filter filter = ECQL.toFilter("pid IN (" + pidsString + ")");
+ final Query query = new Query(
+ "GeoWaveFeatureReaderTest",
+ filter,
+ new String[] {
+ "geometry",
+ "pid"
+ });
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ int count = 0;
+ while (reader.hasNext()) {
+ final SimpleFeature feature = reader.next();
+ assertTrue(fids.contains(feature.getID()));
+ count++;
+ }
+ assertTrue(count == pids.size() - 1);
+ }
+
+
+ @Test
+ public void testBBOX() throws IllegalArgumentException, NoSuchElementException, IOException {
+ final FilterFactoryImpl factory = new FilterFactoryImpl();
+ final Query query = new Query(
+ "GeoWaveFeatureReaderTest",
+ factory.bbox(
+ "",
+ -180,
+ -90,
+ 180,
+ 90,
+ "EPSG:4326"),
+ new String[] {
+ "geometry",
+ "pid"
+ });
+
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ int count = 0;
+ while (reader.hasNext()) {
+ final SimpleFeature feature = reader.next();
+ assertTrue(fids.contains(feature.getID()));
+ count++;
+ }
+ assertTrue(count > 0);
+ }
+
+ @Test
+ public void testRangeIndex() throws IllegalArgumentException, NoSuchElementException, IOException {
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ int count = 0;
+ while (reader.hasNext()) {
+ final SimpleFeature feature = reader.next();
+ assertTrue(fids.contains(feature.getID()));
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testLike() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException {
+ final Query query = new Query(
+ "GeoWaveFeatureReaderTest",
+ ECQL.toFilter("pid like '" + pids.get(
+ 0).substring(
+ 0,
+ 1) + "%'"),
+ new String[] {
+ "geometry",
+ "pid"
+ });
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ int count = 0;
+ while (reader.hasNext()) {
+ final SimpleFeature feature = reader.next();
+ assertTrue(fids.contains(feature.getID()));
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testRemoveFeature() throws IllegalArgumentException, NoSuchElementException, IOException, CQLException {
+ final Query query = new Query(
+ "GeoWaveFeatureReaderTest",
+ ECQL.toFilter("pid like '" + pids.get(
+ 0).substring(
+ 0,
+ 1) + "%'"),
+ new String[] {
+ "geometry",
+ "pid"
+ });
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ int count = 0;
+ while (reader.hasNext()) {
+ final SimpleFeature feature = reader.next();
+ assertTrue(fids.contains(feature.getID()));
+ count++;
+ }
+ assertEquals(1, count);
+
+ // Remove
+ final FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
+ dataStore.getFeatureWriter(type.getTypeName(), Transaction.AUTO_COMMIT);
+ try {
+ while (writer.hasNext()) {
+ writer.next();
+ writer.remove();
+ }
+ } finally {
+ writer.close();
+ }
+
+ // Re-query
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader2 =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ int recount = 0;
+ while (reader2.hasNext()) {
+ reader2.next();
+ recount++;
+ }
+ assertEquals(0, recount);
+ }
+
+ @Test
+ public void testMax() throws IllegalArgumentException, NoSuchElementException, IOException {
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ final MaxVisitor visitor = new MaxVisitor("start", type);
+ unwrapDelegatingFeatureReader(reader).getFeatureCollection().accepts(visitor, null);
+ assertTrue(visitor.getMax().equals(etime));
+ }
+
+ @Test
+ public void testMin() throws IllegalArgumentException, NoSuchElementException, IOException {
+ final FeatureReader<SimpleFeatureType, SimpleFeature> reader =
+ dataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+ final MinVisitor visitor = new MinVisitor("start", type);
+ unwrapDelegatingFeatureReader(reader).getFeatureCollection().accepts(visitor, null);
+ assertTrue(visitor.getMin().equals(stime));
+ }
+
+ private GeoWaveFeatureReader unwrapDelegatingFeatureReader(final FeatureReader<SimpleFeatureType, SimpleFeature> reader ) {
+ // GeoTools uses decorator pattern to wrap FeatureReaders
+ // we need to get down to the inner GeoWaveFeatureReader
+ FeatureReader<SimpleFeatureType, SimpleFeature> currReader = reader;
+ while (!(currReader instanceof GeoWaveFeatureReader)) {
+ currReader = ((DelegatingFeatureReader<SimpleFeatureType, SimpleFeature>) currReader).getDelegate();
+ }
+ return (GeoWaveFeatureReader) currReader;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGTQueryTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGTQueryTest.java b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGTQueryTest.java
new file mode 100644
index 0000000..778b5ef
--- /dev/null
+++ b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGTQueryTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.accumulo.geo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.commons.io.FileUtils;
+import org.geotools.feature.AttributeTypeBuilder;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.feature.simple.SimpleFeatureTypeBuilder;
+import org.geotools.filter.text.cql2.CQLException;
+import org.geotools.filter.text.ecql.ECQL;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Envelope;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.Polygon;
+
+import mil.nga.giat.geowave.adapter.vector.FeatureDataAdapter;
+import mil.nga.giat.geowave.adapter.vector.query.cql.CQLQuery;
+import mil.nga.giat.geowave.core.geotime.GeometryUtils;
+import mil.nga.giat.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider;
+import mil.nga.giat.geowave.core.geotime.store.query.SpatialQuery;
+import mil.nga.giat.geowave.core.store.CloseableIterator;
+import mil.nga.giat.geowave.core.store.DataStore;
+import mil.nga.giat.geowave.core.store.IndexWriter;
+import mil.nga.giat.geowave.core.store.index.PrimaryIndex;
+import mil.nga.giat.geowave.core.store.query.QueryOptions;
+import mil.nga.giat.geowave.datastore.accumulo.AccumuloDataStore;
+import mil.nga.giat.geowave.datastore.accumulo.BasicAccumuloOperations;
+import mil.nga.giat.geowave.datastore.accumulo.minicluster.MiniAccumuloClusterFactory;
+
+/**
+ * This class is intended to provide a self-contained, easy-to-follow example of
+ * a few GeoTools queries against GeoWave. For simplicity, a MiniAccumuloCluster
+ * is spun up and a few points from the DC area are ingested (Washington
+ * Monument, White House, FedEx Field). Two queries are executed against this
+ * data set.
+ */
+public class GeoWaveGTQueryTest {
+ private static File tempAccumuloDir;
+ private static MiniAccumuloClusterImpl accumulo;
+ private static DataStore dataStore;
+
+ private static final PrimaryIndex INDEX = new SpatialDimensionalityTypeProvider().createPrimaryIndex();
+
+ // Points (to be ingested into GeoWave Data Store)
+ private static final Coordinate WASHINGTON_MONUMENT = new Coordinate(-77.0352, 38.8895);
+ private static final Coordinate WHITE_HOUSE = new Coordinate(-77.0366, 38.8977);
+ private static final Coordinate FEDEX_FIELD = new Coordinate(-76.8644, 38.9078);
+
+ // cities used to construct Geometries for queries
+ private static final Coordinate BALTIMORE = new Coordinate(-76.6167, 39.2833);
+ private static final Coordinate RICHMOND = new Coordinate(-77.4667, 37.5333);
+ private static final Coordinate HARRISONBURG = new Coordinate(-78.8689, 38.4496);
+
+ private static final Map<String, Coordinate> CANNED_DATA = ImmutableMap.of(
+ "Washington Monument", WASHINGTON_MONUMENT,
+ "White House", WHITE_HOUSE,
+ "FedEx Field", FEDEX_FIELD
+ );
+
+ private static final FeatureDataAdapter ADAPTER = new FeatureDataAdapter(getPointSimpleFeatureType());
+
+ private static final String ACCUMULO_USER = "root";
+ private static final String ACCUMULO_PASSWORD = "password";
+ private static final String TABLE_NAMESPACE = "";
+
+ @BeforeClass
+ public static void setup() throws AccumuloException, AccumuloSecurityException, IOException, InterruptedException {
+ tempAccumuloDir = Files.createTempDir();
+
+ accumulo = MiniAccumuloClusterFactory.newAccumuloCluster(
+ new MiniAccumuloConfigImpl(tempAccumuloDir, ACCUMULO_PASSWORD),
+ GeoWaveGTQueryTest.class);
+
+ accumulo.start();
+
+ dataStore = new AccumuloDataStore(
+ new BasicAccumuloOperations(
+ accumulo.getZooKeepers(),
+ accumulo.getInstanceName(),
+ ACCUMULO_USER,
+ ACCUMULO_PASSWORD,
+ TABLE_NAMESPACE));
+
+ ingestCannedData();
+ }
+
+ private static void ingestCannedData() throws IOException {
+ final List<SimpleFeature> points = new ArrayList<>();
+
+ System.out.println("Building SimpleFeatures from canned data set...");
+
+ for (final Entry<String, Coordinate> entry : CANNED_DATA.entrySet()) {
+ System.out.println("Added point: " + entry.getKey());
+ points.add(buildSimpleFeature(entry.getKey(), entry.getValue()));
+ }
+
+ System.out.println("Ingesting canned data...");
+
+ try (final IndexWriter<SimpleFeature> indexWriter = dataStore.createWriter(ADAPTER, INDEX)) {
+ for (final SimpleFeature sf : points) {
+ indexWriter.write(sf);
+ }
+ }
+
+ System.out.println("Ingest complete.");
+ }
+
+ @Test
+ public void executeCQLQueryTest() throws IOException, CQLException {
+ System.out.println("Executing query, expecting to match two points...");
+
+ final Filter cqlFilter = ECQL.toFilter("BBOX(geometry,-77.6167,38.6833,-76.6,38.9200) and locationName like 'W%'");
+
+ final QueryOptions queryOptions = new QueryOptions(ADAPTER, INDEX);
+ final CQLQuery cqlQuery = new CQLQuery(null, cqlFilter, ADAPTER);
+
+ try (final CloseableIterator<SimpleFeature> iterator = dataStore.query(queryOptions, cqlQuery)) {
+ int count = 0;
+ while (iterator.hasNext()) {
+ System.out.println("Query match: " + iterator.next().getID());
+ count++;
+ }
+ System.out.println("executeCQLQueryTest count: " + count);
+ // Should match "Washington Monument" and "White House"
+ assertEquals(2, count);
+ }
+ }
+
+ @Test
+ public void executeBoundingBoxQueryTest() throws IOException {
+ System.out.println("Constructing bounding box for the area contained by [Baltimore, MD and Richmond, VA.");
+
+ final Geometry boundingBox = GeometryUtils.GEOMETRY_FACTORY.toGeometry(new Envelope(
+ BALTIMORE,
+ RICHMOND));
+
+ System.out.println("Executing query, expecting to match ALL points...");
+
+ final QueryOptions queryOptions = new QueryOptions(ADAPTER, INDEX);
+ final SpatialQuery spatialQuery = new SpatialQuery(boundingBox);
+
+ try (final CloseableIterator<SimpleFeature> iterator = dataStore.query(queryOptions, spatialQuery)) {
+ int count = 0;
+ while (iterator.hasNext()) {
+ System.out.println("Query match: " + iterator.next().getID());
+ count++;
+ }
+ System.out.println("executeBoundingBoxQueryTest count: " + count);
+ // Should match "FedEx Field", "Washington Monument", and "White House"
+ assertEquals(3, count);
+ }
+ }
+
+ @Test
+ public void executePolygonQueryTest() throws IOException {
+ System.out.println("Constructing polygon for the area contained by [Baltimore, MD; Richmond, VA; Harrisonburg, VA].");
+
+ final Polygon polygon = GeometryUtils.GEOMETRY_FACTORY.createPolygon(new Coordinate[] {
+ BALTIMORE,
+ RICHMOND,
+ HARRISONBURG,
+ BALTIMORE
+ });
+
+ System.out.println("Executing query, expecting to match ALL points...");
+
+ final QueryOptions queryOptions = new QueryOptions(ADAPTER, INDEX);
+ final SpatialQuery spatialQuery = new SpatialQuery(polygon);
+
+ /*
+ * NOTICE: In this query, the adapter is added to the query options. If
+ * an index has data from more than one adapter, the data associated
+ * with a specific adapter can be selected.
+ */
+ try (final CloseableIterator<SimpleFeature> closableIterator = dataStore.query(queryOptions, spatialQuery)) {
+ int count = 0;
+ while (closableIterator.hasNext()) {
+ System.out.println("Query match: " + closableIterator.next().getID());
+ count++;
+ }
+ System.out.println("executePolygonQueryTest count: " + count);
+ // Should match "FedEx Field", "Washington Monument", and "White House"
+ assertEquals(3, count);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException, InterruptedException {
+ try {
+ accumulo.stop();
+ } finally {
+ FileUtils.deleteDirectory(tempAccumuloDir);
+ }
+ }
+
+ private static SimpleFeatureType getPointSimpleFeatureType() {
+ final String name = "PointSimpleFeatureType";
+ final SimpleFeatureTypeBuilder sftBuilder = new SimpleFeatureTypeBuilder();
+ final AttributeTypeBuilder atBuilder = new AttributeTypeBuilder();
+ sftBuilder.setName(name);
+ sftBuilder.add(atBuilder.binding(String.class).nillable(false)
+ .buildDescriptor("locationName"));
+ sftBuilder.add(atBuilder.binding(Geometry.class).nillable(false)
+ .buildDescriptor("geometry"));
+
+ return sftBuilder.buildFeatureType();
+ }
+
+ private static SimpleFeature buildSimpleFeature(final String locationName, final Coordinate coordinate) {
+ final SimpleFeatureBuilder builder = new SimpleFeatureBuilder(getPointSimpleFeatureType());
+ builder.set("locationName", locationName);
+ builder.set("geometry", GeometryUtils.GEOMETRY_FACTORY.createPoint(coordinate));
+
+ return builder.buildFeature(locationName);
+ }
+}
\ No newline at end of file