You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by dl...@apache.org on 2017/08/30 20:31:48 UTC
[10/14] incubator-rya git commit: RYA-324,
RYA-272 Geo refactoring and examples closes #182
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveIndexerSfTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveIndexerSfTest.java b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveIndexerSfTest.java
new file mode 100644
index 0000000..0cf2544
--- /dev/null
+++ b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveIndexerSfTest.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.accumulo.geo;
+
+import static org.apache.rya.indexing.GeoIndexingTestUtils.getSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.commons.io.FileUtils;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.GeoIndexerType;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.geotools.geometry.jts.Geometries;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.LineString;
+import com.vividsolutions.jts.geom.LinearRing;
+import com.vividsolutions.jts.geom.Point;
+import com.vividsolutions.jts.geom.Polygon;
+import com.vividsolutions.jts.geom.PrecisionModel;
+import com.vividsolutions.jts.geom.impl.PackedCoordinateSequence;
+import com.vividsolutions.jts.io.ParseException;
+import com.vividsolutions.jts.io.gml2.GMLWriter;
+
+import info.aduna.iteration.CloseableIteration;
+import mil.nga.giat.geowave.datastore.accumulo.minicluster.MiniAccumuloClusterFactory;
+
+/**
+ * Tests all of the "simple functions" of the geoindexer specific to GML.
+ * Parameterized so that each test is run for WKT and for GML.
+ */
+@RunWith(value = Parameterized.class)
+public class GeoWaveIndexerSfTest {
+ private static AccumuloRdfConfiguration conf;
+ private static GeometryFactory gf = new GeometryFactory(new PrecisionModel(), 4326);
+ private static GeoWaveGeoIndexer g;
+
+ private static final StatementConstraints EMPTY_CONSTRAINTS = new StatementConstraints();
+
+ // Here is the landscape:
+ /**
+ * <pre>
+ * 2---+---+---+---+---+---+
+ * | F |G |
+ * 1 A o(-1,1) o C |
+ * | | |
+ * 0---+---+ +---+---+(3,0)
+ * | | E |
+ * -1 B + .---+---+
+ * | | /| | |
+ * -2---+---+-/-+---+ +
+ * ^ / | D |
+ * -3 -2 -1 0---1---2 3 4
+ * </pre>
+ **/
+ private static final Polygon A = poly(bbox(-3, -2, 1, 2));
+ private static final Polygon B = poly(bbox(-3, -2, -1, 0));
+ private static final Polygon C = poly(bbox(1, 0, 3, 2));
+ private static final Polygon D = poly(bbox(0, -3, 2, -1));
+
+ private static final Point F = point(-1, 1);
+ private static final Point G = point(1, 1);
+
+ private static final LineString E = line(-1, -3, 0, -1);
+
+ private static final Map<Geometry, String> NAMES = ImmutableMap.<Geometry, String>builder()
+ .put(A, "A")
+ .put(B, "B")
+ .put(C, "C")
+ .put(D, "D")
+ .put(E, "E")
+ .put(F, "F")
+ .put(G, "G")
+ .build();
+
+ private static File tempAccumuloDir;
+ private static MiniAccumuloClusterImpl accumulo;
+
+ private static final boolean IS_MOCK = true;
+
+ private static final String ACCUMULO_USER = IS_MOCK ? "username" : "root";
+ private static final String ACCUMULO_PASSWORD = "password";
+
+ /**
+ * JUnit 4 parameterized iterates thru this list and calls the constructor with each.
+ * For each test, Call the constructor three times, for WKT and for GML encoding 1, and GML encoding 2
+ */
+ private static final URI USE_JTS_LIB_ENCODING = new URIImpl("uri:useLib") ;
+ private static final URI USE_ROUGH_ENCODING = new URIImpl("uri:useRough") ;
+
+ @Parameters
+ public static Collection<URI[]> constructorData() {
+ final URI[][] data = new URI[][] { { GeoConstants.XMLSCHEMA_OGC_WKT, USE_JTS_LIB_ENCODING }, { GeoConstants.XMLSCHEMA_OGC_GML, USE_JTS_LIB_ENCODING }, { GeoConstants.XMLSCHEMA_OGC_GML, USE_JTS_LIB_ENCODING } };
+ return Arrays.asList(data);
+ }
+
+ private final URI schemaToTest;
+ private final URI encodeMethod;
+
+ /**
+ * Constructor required by JUnit parameterized runner. See {@link #constructorData()} for constructor values.
+ * @param schemaToTest the schema to test {@link URI}.
+ * @param encodeMethod the encode method {@link URI}.
+ */
+ public GeoWaveIndexerSfTest(final URI schemaToTest, final URI encodeMethod) {
+ this.schemaToTest = schemaToTest;
+ this.encodeMethod = encodeMethod;
+ }
+
+ @BeforeClass
+ public static void setup() throws AccumuloException, AccumuloSecurityException, IOException, InterruptedException {
+ if (!IS_MOCK) {
+ tempAccumuloDir = Files.createTempDir();
+
+ accumulo = MiniAccumuloClusterFactory.newAccumuloCluster(
+ new MiniAccumuloConfigImpl(tempAccumuloDir, ACCUMULO_PASSWORD),
+ GeoWaveIndexerTest.class);
+
+ accumulo.start();
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException, InterruptedException {
+ if (!IS_MOCK) {
+ try {
+ accumulo.stop();
+ } finally {
+ FileUtils.deleteDirectory(tempAccumuloDir);
+ }
+ }
+ }
+
+ /**
+ * Run before each test method.
+ * @throws Exception
+ */
+ @Before
+ public void before() throws Exception {
+ conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix("triplestore_");
+ final String tableName = GeoWaveGeoIndexer.getTableName(conf);
+ conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, IS_MOCK);
+ conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER);
+ conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD);
+ conf.set(ConfigUtils.CLOUDBASE_INSTANCE, IS_MOCK ? "INSTANCE" : accumulo.getInstanceName());
+ conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, IS_MOCK ? "localhost" : accumulo.getZooKeepers());
+ conf.set(ConfigUtils.CLOUDBASE_AUTHS, "U");
+ conf.set(OptionalConfigUtils.USE_GEO, "true");
+ conf.set(OptionalConfigUtils.GEO_INDEXER_TYPE, GeoIndexerType.GEO_WAVE.toString());
+
+ final TableOperations tops = ConfigUtils.getConnector(conf).tableOperations();
+ // get all of the table names with the prefix
+ final Set<String> toDel = Sets.newHashSet();
+ for (final String t : tops.list()) {
+ if (t.startsWith(tableName)) {
+ toDel.add(t);
+ }
+ }
+ for (final String t : toDel) {
+ tops.delete(t);
+ }
+
+ g = new GeoWaveGeoIndexer();
+ g.setConf(conf);
+ g.purge(conf);
+ // Convert the statements as schema WKT or GML, then GML has two methods to encode.
+ g.storeStatement(createRyaStatement(A, schemaToTest, encodeMethod));
+ g.storeStatement(createRyaStatement(B, schemaToTest, encodeMethod));
+ g.storeStatement(createRyaStatement(C, schemaToTest, encodeMethod));
+ g.storeStatement(createRyaStatement(D, schemaToTest, encodeMethod));
+ g.storeStatement(createRyaStatement(F, schemaToTest, encodeMethod));
+ g.storeStatement(createRyaStatement(E, schemaToTest, encodeMethod));
+ g.storeStatement(createRyaStatement(G, schemaToTest, encodeMethod));
+ }
+
+ private static RyaStatement createRyaStatement(final Geometry geo, final URI schema, final URI encodingMethod) {
+ return RdfToRyaConversions.convertStatement(genericStatement(geo,schema,encodingMethod));
+ }
+
+ private static Statement genericStatement(final Geometry geo, final URI schema, final URI encodingMethod) {
+ if (schema.equals(GeoConstants.XMLSCHEMA_OGC_WKT)) {
+ return genericStatementWkt(geo);
+ } else if (schema.equals(GeoConstants.XMLSCHEMA_OGC_GML)) {
+ return genericStatementGml(geo, encodingMethod);
+ }
+ throw new Error("schema unsupported: "+schema);
+ }
+
+ private static Statement genericStatementWkt(final Geometry geo) {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("uri:" + NAMES.get(geo));
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final Value object = vf.createLiteral(geo.toString(), GeoConstants.XMLSCHEMA_OGC_WKT);
+ return new StatementImpl(subject, predicate, object);
+ }
+
+ private static Statement genericStatementGml(final Geometry geo, final URI encodingMethod) {
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("uri:" + NAMES.get(geo));
+ final URI predicate = GeoConstants.GEO_AS_GML;
+
+ final String gml ;
+ if (encodingMethod == USE_JTS_LIB_ENCODING) {
+ gml = geoToGmlUseJtsLib(geo);
+ } else if (encodingMethod == USE_ROUGH_ENCODING) {
+ gml = geoToGmlRough(geo);
+ }
+ else {
+ throw new Error("invalid encoding method: "+encodingMethod);
+ // System.out.println("===created GML====");
+ // System.out.println(gml);
+ // System.out.println("========== GML====");
+ }
+
+ final Value object = vf.createLiteral(gml, GeoConstants.XMLSCHEMA_OGC_GML);
+ return new StatementImpl(subject, predicate, object);
+ }
+
+ /**
+ * JTS library conversion from geometry to GML.
+ * @param geo base Geometry gets delegated
+ * @return String gml encoding of the geomoetry
+ */
+ private static String geoToGmlUseJtsLib(final Geometry geo) {
+ final int srid = geo.getSRID();
+ final GMLWriter gmlWriter = new GMLWriter();
+ gmlWriter.setNamespace(false);
+ gmlWriter.setPrefix(null);
+
+ if (srid != -1 || srid != 0) {
+ gmlWriter.setSrsName("EPSG:" + geo.getSRID());
+ }
+ final String gml = gmlWriter.write(geo);
+ // Hack to replace a gml 2.0 deprecated element in the Polygon.
+ // It should tolerate this as it does other depreciated elements like <gml:coordinates>.
+ return gml.replace("outerBoundaryIs", "exterior");
+ }
+
+ /**
+ * Rough conversion from geometry to GML using a template.
+ * @param geo base Geometry gets delegated
+ * @return String gml encoding of the gemoetry
+ */
+ private static String geoToGmlRough(final Geometry geo) {
+ final Geometries theType = org.geotools.geometry.jts.Geometries.get(geo);
+ switch (theType) {
+ case POINT:
+ return geoToGml((Point)geo);
+ case LINESTRING:
+ return geoToGml((LineString)geo);
+ case POLYGON:
+ return geoToGml((Polygon)geo);
+ case MULTIPOINT:
+ case MULTILINESTRING:
+ case MULTIPOLYGON:
+ default:
+ throw new Error("No code to convert to GML for this type: "+theType);
+ }
+ }
+
+ private static Point point(final double x, final double y) {
+ return gf.createPoint(new Coordinate(x, y));
+ }
+
+ private static String geoToGml(final Point point) {
+ //CRS:84 long X,lat Y
+ //ESPG:4326 lat Y,long X
+ return "<Point"//
+ + " srsName='CRS:84'"// TODO: point.getSRID()
+ + "><pos>"+point.getX()+" "+point.getY()+"</pos> "// assumes Y=lat X=long
+ + " </Point>";
+ }
+
+ private static LineString line(final double x1, final double y1, final double x2, final double y2) {
+ return new LineString(new PackedCoordinateSequence.Double(new double[] { x1, y1, x2, y2 }, 2), gf);
+ }
+
+ /**
+ * convert a lineString geometry to GML
+ * @param line
+ * @return String that is XML that is a GMLLiteral of line
+ */
+ private static String geoToGml(final LineString line) {
+ final StringBuilder coordString = new StringBuilder() ;
+ for (final Coordinate coor : line.getCoordinates()) {
+ coordString.append(" ").append(coor.x).append(" ").append(coor.y); //ESPG:4326 lat/long
+ }
+ return " <gml:LineString srsName=\"http://www.opengis.net/def/crs/EPSG/0/4326\" xmlns:gml='http://www.opengis.net/gml'>\n"
+ + "<gml:posList srsDimension=\"2\">"//
+ + coordString //
+ + "</gml:posList></gml:LineString >";
+ }
+
+ private static Polygon poly(final double[] arr) {
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(arr, 2));
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+ return p1;
+ }
+
+ /**
+ * convert a Polygon geometry to GML
+ * @param geometry
+ * @return String that is XML that is a GMLLiteral of line
+ */
+ private static String geoToGml(final Polygon poly) {
+ final StringBuilder coordString = new StringBuilder() ;
+ for (final Coordinate coor : poly.getCoordinates()) {
+ coordString.append(" ").append(coor.x).append(" ").append(coor.y); //ESPG:4326 lat/long
+ //with commas: coordString.append(" ").append(coor.x).append(",").append(coor.y);
+ }
+ return "<gml:Polygon srsName=\"EPSG:4326\" xmlns:gml='http://www.opengis.net/gml'>\r\n"//
+ + "<gml:exterior><gml:LinearRing>\r\n"//
+ + "<gml:posList srsDimension='2'>\r\n"
+ + coordString
+ + "</gml:posList>\r\n"//
+ + "</gml:LinearRing></gml:exterior>\r\n</gml:Polygon>\r\n";
+ }
+
+ private static double[] bbox(final double x1, final double y1, final double x2, final double y2) {
+ return new double[] { x1, y1, x1, y2, x2, y2, x2, y1, x1, y1 };
+ }
+
+ private void compare(final CloseableIteration<Statement, ?> actual, final Geometry... expected) throws Exception {
+ final Set<Statement> expectedSet = Sets.newHashSet();
+ for (final Geometry geo : expected) {
+ expectedSet.add(RyaToRdfConversions.convertStatement(createRyaStatement(geo, this.schemaToTest, encodeMethod)));
+ }
+
+ Assert.assertEquals(expectedSet, getSet(actual));
+ }
+
+ private static final Geometry[] EMPTY_RESULTS = {};
+
+ @Test
+ public void testParsePoly() throws Exception {
+ assertParseable(D);
+ }
+
+ @Test
+ public void testParseLine() throws Exception {
+ assertParseable(E);
+ }
+
+ @Test
+ public void testParsePoint() throws Exception {
+ assertParseable(F);
+ }
+
+ /**
+ * Convert Geometry to Wkt|GML (schemaToTest), parse to Geometry, and compare to original.
+ * @param originalGeom the original {@link Geometry}.
+ * @throws ParseException
+ */
+ public void assertParseable(final Geometry originalGeom) throws ParseException {
+ final Geometry parsedGeom = GeoParseUtils.getGeometry(genericStatement(originalGeom,schemaToTest, encodeMethod), new GmlParser());
+ assertTrue("Parsed should equal original: "+originalGeom+" parsed: "+parsedGeom, originalGeom.equalsNorm(parsedGeom));
+ assertEquals( originalGeom, parsedGeom ); //also passes
+ assertTrue( originalGeom.equalsExact(parsedGeom) ); //also passes
+ }
+
+ @Test
+ public void testEquals() throws Exception {
+ // point
+ compare(g.queryEquals(F, EMPTY_CONSTRAINTS), F);
+ compare(g.queryEquals(point(-1, -1), EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+
+ // line
+ compare(g.queryEquals(E, EMPTY_CONSTRAINTS), E);
+ compare(g.queryEquals(line(-1, -1, 0, 0), EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+
+ // poly
+ compare(g.queryEquals(A, EMPTY_CONSTRAINTS), A);
+ compare(g.queryEquals(poly(bbox(-2, -2, 1, 2)), EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+ }
+
+ @Test
+ public void testDisjoint() throws Exception {
+ // point
+ compare(g.queryDisjoint(F, EMPTY_CONSTRAINTS), B, C, D, E, G);
+
+ // line
+ compare(g.queryDisjoint(E, EMPTY_CONSTRAINTS), B, C, F, G);
+
+ // poly
+ compare(g.queryDisjoint(A, EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+ compare(g.queryDisjoint(B, EMPTY_CONSTRAINTS), C, D, F, E, G);
+ }
+
+ @Test
+ public void testIntersectsPoint() throws Exception {
+ compare(g.queryIntersects(F, EMPTY_CONSTRAINTS), A, F);
+ }
+
+ @Test
+ public void testIntersectsLine() throws Exception {
+ compare(g.queryIntersects(E, EMPTY_CONSTRAINTS), A, E, D);
+ }
+
+ @Test
+ public void testIntersectsPoly() throws Exception {
+ compare(g.queryIntersects(A, EMPTY_CONSTRAINTS), A, B, C, D, F, E, G);
+ }
+
+ @Test
+ public void testTouchesPoint() throws Exception {
+ compare(g.queryTouches(F, EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+ compare(g.queryTouches(G, EMPTY_CONSTRAINTS), A, C);
+ }
+
+ @Test
+ public void testTouchesLine() throws Exception {
+ compare(g.queryTouches(E, EMPTY_CONSTRAINTS), D);
+ }
+
+ @Test
+ public void testTouchesPoly() throws Exception {
+ compare(g.queryTouches(A, EMPTY_CONSTRAINTS), C,G);
+ }
+
+ @Test
+ public void testCrossesPoint() throws Exception {
+ compare(g.queryCrosses(F, EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+ compare(g.queryCrosses(G, EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+ compare(g.queryCrosses(point(2, 0), EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+ }
+
+ @Test
+ public void testCrossesLine() throws Exception {
+ compare(g.queryCrosses(E, EMPTY_CONSTRAINTS), A);
+ }
+
+ @Test
+ public void testCrossesPoly() throws Exception {
+ compare(g.queryCrosses(A, EMPTY_CONSTRAINTS), E);
+ compare(g.queryCrosses(poly(bbox(-0.9, -2.9, -0.1, -1.1)), EMPTY_CONSTRAINTS), E);
+ }
+
+ @Test
+ public void testWithin() throws Exception {
+ // point
+ compare(g.queryWithin(F, EMPTY_CONSTRAINTS), F);
+
+ // line
+ compare(g.queryWithin(E, EMPTY_CONSTRAINTS), E);
+
+ // poly
+ compare(g.queryWithin(A, EMPTY_CONSTRAINTS), A, B, F);
+ }
+
+ @Test
+ public void testContainsPoint() throws Exception {
+ compare(g.queryContains(F, EMPTY_CONSTRAINTS), A, F);
+ }
+
+ @Test
+ public void testContainsLine() throws Exception {
+ compare(g.queryContains(E, EMPTY_CONSTRAINTS), E);
+ }
+
+ @Test
+ public void testContainsPoly() throws Exception {
+ compare(g.queryContains(A, EMPTY_CONSTRAINTS), A);
+ compare(g.queryContains(B, EMPTY_CONSTRAINTS), A, B);
+ }
+
+ @Test
+ public void testOverlapsPoint() throws Exception {
+ compare(g.queryOverlaps(F, EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+ }
+
+ @Test
+ public void testOverlapsLine() throws Exception {
+ compare(g.queryOverlaps(E, EMPTY_CONSTRAINTS), EMPTY_RESULTS);
+ }
+
+ @Test
+ public void testOverlapsPoly() throws Exception {
+ compare(g.queryOverlaps(A, EMPTY_CONSTRAINTS), D);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveIndexerTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveIndexerTest.java b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveIndexerTest.java
new file mode 100644
index 0000000..1930a50
--- /dev/null
+++ b/extras/rya.geoindexing/geo.geowave/src/test/java/org/apache/rya/indexing/accumulo/geo/GeoWaveIndexerTest.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.accumulo.geo;
+
+import static org.apache.rya.api.resolver.RdfToRyaConversions.convertStatement;
+import static org.apache.rya.indexing.GeoIndexingTestUtils.getSet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.commons.io.FileUtils;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.GeoIndexerType;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ContextStatementImpl;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.LinearRing;
+import com.vividsolutions.jts.geom.Point;
+import com.vividsolutions.jts.geom.Polygon;
+import com.vividsolutions.jts.geom.PrecisionModel;
+import com.vividsolutions.jts.geom.impl.PackedCoordinateSequence;
+
+import mil.nga.giat.geowave.datastore.accumulo.minicluster.MiniAccumuloClusterFactory;
+
+/**
+ * Tests higher level functioning of the geoindexer parse WKT, predicate list,
+ * prime and anti meridian, delete, search, context, search with Statement Constraints.
+ */
+public class GeoWaveIndexerTest {
+
+ private static final StatementConstraints EMPTY_CONSTRAINTS = new StatementConstraints();
+
+ private AccumuloRdfConfiguration conf;
+ private final GeometryFactory gf = new GeometryFactory(new PrecisionModel(), 4326);
+
+ private static File tempAccumuloDir;
+ private static MiniAccumuloClusterImpl accumulo;
+
+ private static final boolean IS_MOCK = true;
+
+ private static final String ACCUMULO_USER = IS_MOCK ? "username" : "root";
+ private static final String ACCUMULO_PASSWORD = "password";
+
+ @BeforeClass
+ public static void setup() throws AccumuloException, AccumuloSecurityException, IOException, InterruptedException {
+ if (!IS_MOCK) {
+ tempAccumuloDir = Files.createTempDir();
+
+ accumulo = MiniAccumuloClusterFactory.newAccumuloCluster(
+ new MiniAccumuloConfigImpl(tempAccumuloDir, ACCUMULO_PASSWORD),
+ GeoWaveIndexerTest.class);
+
+ accumulo.start();
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException, InterruptedException {
+ if (!IS_MOCK) {
+ try {
+ accumulo.stop();
+ } finally {
+ FileUtils.deleteDirectory(tempAccumuloDir);
+ }
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix("triplestore_");
+ final String tableName = GeoWaveGeoIndexer.getTableName(conf);
+ conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, IS_MOCK);
+ conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER);
+ conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD);
+ conf.set(ConfigUtils.CLOUDBASE_INSTANCE, IS_MOCK ? "INSTANCE" : accumulo.getInstanceName());
+ conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, IS_MOCK ? "localhost" : accumulo.getZooKeepers());
+ conf.set(ConfigUtils.CLOUDBASE_AUTHS, "U");
+ conf.set(OptionalConfigUtils.USE_GEO, "true");
+ conf.set(OptionalConfigUtils.GEO_INDEXER_TYPE, GeoIndexerType.GEO_WAVE.toString());
+
+ final TableOperations tops = ConfigUtils.getConnector(conf).tableOperations();
+ // get all of the table names with the prefix
+ final Set<String> toDel = Sets.newHashSet();
+ for (final String t : tops.list()){
+ if (t.startsWith(tableName)){
+ toDel.add(t);
+ }
+ }
+ for (final String t : toDel) {
+ tops.delete(t);
+ }
+ }
+
+ @Test
+ public void testRestrictPredicatesSearch() throws Exception {
+ conf.setStrings(ConfigUtils.GEO_PREDICATES_LIST, "pred:1,pred:2");
+ try (final GeoWaveGeoIndexer f = new GeoWaveGeoIndexer()) {
+ f.setConf(conf);
+ f.purge(conf);
+
+ final ValueFactory vf = new ValueFactoryImpl();
+
+ final Point point = gf.createPoint(new Coordinate(10, 10));
+ final Value pointValue = vf.createLiteral("Point(10 10)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final URI invalidPredicate = GeoConstants.GEO_AS_WKT;
+
+ // These should not be stored because they are not in the predicate list
+ f.storeStatement(convertStatement(new StatementImpl(vf.createURI("foo:subj1"), invalidPredicate, pointValue)));
+ f.storeStatement(convertStatement(new StatementImpl(vf.createURI("foo:subj2"), invalidPredicate, pointValue)));
+
+ final URI pred1 = vf.createURI("pred:1");
+ final URI pred2 = vf.createURI("pred:2");
+
+ // These should be stored because they are in the predicate list
+ final Statement s3 = new StatementImpl(vf.createURI("foo:subj3"), pred1, pointValue);
+ final Statement s4 = new StatementImpl(vf.createURI("foo:subj4"), pred2, pointValue);
+ f.storeStatement(convertStatement(s3));
+ f.storeStatement(convertStatement(s4));
+
+ // This should not be stored because the object is not valid wkt
+ f.storeStatement(convertStatement(new StatementImpl(vf.createURI("foo:subj5"), pred1, vf.createLiteral("soint(10 10)"))));
+
+ // This should not be stored because the object is not a literal
+ f.storeStatement(convertStatement(new StatementImpl(vf.createURI("foo:subj6"), pred1, vf.createURI("p:Point(10 10)"))));
+
+ f.flush();
+
+ final Set<Statement> actual = getSet(f.queryEquals(point, EMPTY_CONSTRAINTS));
+ Assert.assertEquals(2, actual.size());
+ Assert.assertTrue(actual.contains(s3));
+ Assert.assertTrue(actual.contains(s4));
+ }
+ }
+
+ @Test
+ public void testPrimeMeridianSearch() throws Exception {
+ try (final GeoWaveGeoIndexer f = new GeoWaveGeoIndexer()) {
+ f.setConf(conf);
+ f.purge(conf);
+
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("foo:subj");
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final Value object = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Resource context = vf.createURI("foo:context");
+
+ final Statement statement = new ContextStatementImpl(subject, predicate, object, context);
+ f.storeStatement(convertStatement(statement));
+ f.flush();
+
+ final double[] ONE = { 1, 1, -1, 1, -1, -1, 1, -1, 1, 1 };
+ final double[] TWO = { 2, 2, -2, 2, -2, -2, 2, -2, 2, 2 };
+ final double[] THREE = { 3, 3, -3, 3, -3, -3, 3, -3, 3, 3 };
+
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(ONE, 2));
+ final LinearRing r2 = gf.createLinearRing(new PackedCoordinateSequence.Double(TWO, 2));
+ final LinearRing r3 = gf.createLinearRing(new PackedCoordinateSequence.Double(THREE, 2));
+
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+ final Polygon p2 = gf.createPolygon(r2, new LinearRing[] {});
+ final Polygon p3 = gf.createPolygon(r3, new LinearRing[] {});
+
+ Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryWithin(p1, EMPTY_CONSTRAINTS)));
+ Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryWithin(p2, EMPTY_CONSTRAINTS)));
+ Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryWithin(p3, EMPTY_CONSTRAINTS)));
+
+ // Test a ring with a hole in it
+ final Polygon p3m2 = gf.createPolygon(r3, new LinearRing[] { r2 });
+ Assert.assertEquals(Sets.newHashSet(), getSet(f.queryWithin(p3m2, EMPTY_CONSTRAINTS)));
+
+ // test a ring outside the point
+ final double[] OUT = { 3, 3, 1, 3, 1, 1, 3, 1, 3, 3 };
+ final LinearRing rOut = gf.createLinearRing(new PackedCoordinateSequence.Double(OUT, 2));
+ final Polygon pOut = gf.createPolygon(rOut, new LinearRing[] {});
+ Assert.assertEquals(Sets.newHashSet(), getSet(f.queryWithin(pOut, EMPTY_CONSTRAINTS)));
+ }
+ }
+
+ @Test
+ public void testDcSearch() throws Exception {
+ // test a ring around dc
+ try (final GeoWaveGeoIndexer f = new GeoWaveGeoIndexer()) {
+ f.setConf(conf);
+ f.purge(conf);
+
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("foo:subj");
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final Value object = vf.createLiteral("Point(-77.03524 38.889468)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Resource context = vf.createURI("foo:context");
+
+ final Statement statement = new ContextStatementImpl(subject, predicate, object, context);
+ f.storeStatement(convertStatement(statement));
+ f.flush();
+
+ final double[] IN = { -78, 39, -77, 39, -77, 38, -78, 38, -78, 39 };
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(IN, 2));
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+ Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryWithin(p1, EMPTY_CONSTRAINTS)));
+
+ // test a ring outside the point
+ final double[] OUT = { -77, 39, -76, 39, -76, 38, -77, 38, -77, 39 };
+ final LinearRing rOut = gf.createLinearRing(new PackedCoordinateSequence.Double(OUT, 2));
+ final Polygon pOut = gf.createPolygon(rOut, new LinearRing[] {});
+ Assert.assertEquals(Sets.newHashSet(), getSet(f.queryWithin(pOut, EMPTY_CONSTRAINTS)));
+ }
+ }
+
+ @Test
+ public void testDeleteSearch() throws Exception {
+ // test a ring around dc
+ try (final GeoWaveGeoIndexer f = new GeoWaveGeoIndexer()) {
+ f.setConf(conf);
+ f.purge(conf);
+
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("foo:subj");
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final Value object = vf.createLiteral("Point(-77.03524 38.889468)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Resource context = vf.createURI("foo:context");
+
+ final Statement statement = new ContextStatementImpl(subject, predicate, object, context);
+ f.storeStatement(convertStatement(statement));
+ f.flush();
+
+ f.deleteStatement(convertStatement(statement));
+
+ // test a ring that the point would be inside of if not deleted
+ final double[] in = { -78, 39, -77, 39, -77, 38, -78, 38, -78, 39 };
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(in, 2));
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+ Assert.assertEquals(Sets.newHashSet(), getSet(f.queryWithin(p1, EMPTY_CONSTRAINTS)));
+
+ // test a ring that the point would be outside of if not deleted
+ final double[] out = { -77, 39, -76, 39, -76, 38, -77, 38, -77, 39 };
+ final LinearRing rOut = gf.createLinearRing(new PackedCoordinateSequence.Double(out, 2));
+ final Polygon pOut = gf.createPolygon(rOut, new LinearRing[] {});
+ Assert.assertEquals(Sets.newHashSet(), getSet(f.queryWithin(pOut, EMPTY_CONSTRAINTS)));
+
+ // test a ring for the whole world and make sure the point is gone
+ final double[] world = { -180, 90, 180, 90, 180, -90, -180, -90, -180, 90 };
+ final LinearRing rWorld = gf.createLinearRing(new PackedCoordinateSequence.Double(world, 2));
+ final Polygon pWorld = gf.createPolygon(rWorld, new LinearRing[] {});
+ Assert.assertEquals(Sets.newHashSet(), getSet(f.queryWithin(pWorld, EMPTY_CONSTRAINTS)));
+ }
+ }
+
+ @Test
+ public void testDcSearchWithContext() throws Exception {
+ // test a ring around dc
+ try (final GeoWaveGeoIndexer f = new GeoWaveGeoIndexer()) {
+ f.setConf(conf);
+ f.purge(conf);
+
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("foo:subj");
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final Value object = vf.createLiteral("Point(-77.03524 38.889468)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Resource context = vf.createURI("foo:context");
+
+ final Statement statement = new ContextStatementImpl(subject, predicate, object, context);
+ f.storeStatement(convertStatement(statement));
+ f.flush();
+
+ final double[] IN = { -78, 39, -77, 39, -77, 38, -78, 38, -78, 39 };
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(IN, 2));
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+
+ // query with correct context
+ Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryWithin(p1, new StatementConstraints().setContext(context))));
+
+ // query with wrong context
+ Assert.assertEquals(Sets.newHashSet(),
+ getSet(f.queryWithin(p1, new StatementConstraints().setContext(vf.createURI("foo:context2")))));
+ }
+ }
+
+ @Test
+ public void testDcSearchWithSubject() throws Exception {
+ // test a ring around dc
+ try (final GeoWaveGeoIndexer f = new GeoWaveGeoIndexer()) {
+ f.setConf(conf);
+ f.purge(conf);
+
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("foo:subj");
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final Value object = vf.createLiteral("Point(-77.03524 38.889468)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Resource context = vf.createURI("foo:context");
+
+ final Statement statement = new ContextStatementImpl(subject, predicate, object, context);
+ f.storeStatement(convertStatement(statement));
+ f.flush();
+
+ final double[] IN = { -78, 39, -77, 39, -77, 38, -78, 38, -78, 39 };
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(IN, 2));
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+
+ // query with correct subject
+ Assert.assertEquals(Sets.newHashSet(statement), getSet(f.queryWithin(p1, new StatementConstraints().setSubject(subject))));
+
+ // query with wrong subject
+ Assert.assertEquals(Sets.newHashSet(), getSet(f.queryWithin(p1, new StatementConstraints().setSubject(vf.createURI("foo:subj2")))));
+ }
+ }
+
+ @Test
+ public void testDcSearchWithSubjectAndContext() throws Exception {
+ // test a ring around dc
+ try (final GeoWaveGeoIndexer f = new GeoWaveGeoIndexer()) {
+ f.setConf(conf);
+ f.purge(conf);
+
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("foo:subj");
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final Value object = vf.createLiteral("Point(-77.03524 38.889468)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Resource context = vf.createURI("foo:context");
+
+ final Statement statement = new ContextStatementImpl(subject, predicate, object, context);
+ f.storeStatement(convertStatement(statement));
+ f.flush();
+
+ final double[] IN = { -78, 39, -77, 39, -77, 38, -78, 38, -78, 39 };
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(IN, 2));
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+
+ // query with correct context subject
+ Assert.assertEquals(Sets.newHashSet(statement),
+ getSet(f.queryWithin(p1, new StatementConstraints().setContext(context).setSubject(subject))));
+
+ // query with wrong context
+ Assert.assertEquals(Sets.newHashSet(),
+ getSet(f.queryWithin(p1, new StatementConstraints().setContext(vf.createURI("foo:context2")))));
+
+ // query with wrong subject
+ Assert.assertEquals(Sets.newHashSet(), getSet(f.queryWithin(p1, new StatementConstraints().setSubject(vf.createURI("foo:subj2")))));
+ }
+ }
+
+ @Test
+ public void testDcSearchWithPredicate() throws Exception {
+ // test a ring around dc
+ try (final GeoWaveGeoIndexer f = new GeoWaveGeoIndexer()) {
+ f.setConf(conf);
+ f.purge(conf);
+
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource subject = vf.createURI("foo:subj");
+ final URI predicate = GeoConstants.GEO_AS_WKT;
+ final Value object = vf.createLiteral("Point(-77.03524 38.889468)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Resource context = vf.createURI("foo:context");
+
+ final Statement statement = new ContextStatementImpl(subject, predicate, object, context);
+ f.storeStatement(convertStatement(statement));
+ f.flush();
+
+ final double[] IN = { -78, 39, -77, 39, -77, 38, -78, 38, -78, 39 };
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(IN, 2));
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+
+ // query with correct Predicate
+ Assert.assertEquals(Sets.newHashSet(statement),
+ getSet(f.queryWithin(p1, new StatementConstraints().setPredicates(Collections.singleton(predicate)))));
+
+ // query with wrong predicate
+ Assert.assertEquals(Sets.newHashSet(),
+ getSet(f.queryWithin(p1, new StatementConstraints().setPredicates(Collections.singleton(vf.createURI("other:pred"))))));
+ }
+ }
+
+ // @Test
+ public void testAntiMeridianSearch() throws Exception {
+ // verify that a search works if the bounding box crosses the anti meridian
+ try (final GeoWaveGeoIndexer f = new GeoWaveGeoIndexer()) {
+ f.setConf(conf);
+ f.purge(conf);
+
+ final ValueFactory vf = new ValueFactoryImpl();
+ final Resource context = vf.createURI("foo:context");
+
+ final Resource subjectEast = vf.createURI("foo:subj:east");
+ final URI predicateEast = GeoConstants.GEO_AS_WKT;
+ final Value objectEast = vf.createLiteral("Point(179 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Statement statementEast = new ContextStatementImpl(subjectEast, predicateEast, objectEast, context);
+ f.storeStatement(convertStatement(statementEast));
+
+ final Resource subjectWest = vf.createURI("foo:subj:west");
+ final URI predicateWest = GeoConstants.GEO_AS_WKT;
+ final Value objectWest = vf.createLiteral("Point(-179 0)", GeoConstants.XMLSCHEMA_OGC_WKT);
+ final Statement statementWest = new ContextStatementImpl(subjectWest, predicateWest, objectWest, context);
+ f.storeStatement(convertStatement(statementWest));
+
+ f.flush();
+
+ final double[] ONE = { 178.1, 1, -178, 1, -178, -1, 178.1, -1, 178.1, 1 };
+
+ final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(ONE, 2));
+
+ final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {});
+
+ Assert.assertEquals(Sets.newHashSet(statementEast, statementWest), getSet(f.queryWithin(p1, EMPTY_CONSTRAINTS)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/pom.xml b/extras/rya.geoindexing/geo.mongo/pom.xml
new file mode 100644
index 0000000..f8c4f49
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/pom.xml
@@ -0,0 +1,41 @@
+<?xml version='1.0'?>
+
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.geoindexing</artifactId>
+ <version>3.2.11-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>geo.mongo</artifactId>
+ <name>Apache Rya Geo Indexing using MongoDB</name>
+ <description>Implementation of a geospatial indexing for mongo DB backed Rya</description>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <geotools.version>16.0</geotools.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>geo.common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.geotools.xsd</groupId>
+ <artifactId>gt-xsd-gml3</artifactId>
+ <version>${geotools.version}</version>
+ </dependency>
+
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geoExamples/RyaMongoGeoDirectExample.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geoExamples/RyaMongoGeoDirectExample.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geoExamples/RyaMongoGeoDirectExample.java
new file mode 100644
index 0000000..e42ce07
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geoExamples/RyaMongoGeoDirectExample.java
@@ -0,0 +1,238 @@
+package org.apache.rya.indexing.geoExamples;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.lang.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.GeoRyaSailFactory;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.geo.OptionalConfigUtils;
+import org.apache.rya.indexing.mongodb.MongoIndexingConfiguration;
+import org.apache.rya.indexing.mongodb.MongoIndexingConfiguration.MongoDBIndexingConfigBuilder;
+import org.apache.rya.mongodb.MockMongoFactory;
+import org.apache.rya.mongodb.MongoConnectorFactory;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.QueryResultHandlerException;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResultHandler;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.query.Update;
+import org.openrdf.query.UpdateExecutionException;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+
+import com.mongodb.MongoClient;
+import com.mongodb.ServerAddress;
+
+public class RyaMongoGeoDirectExample {
+ private static final Logger log = Logger.getLogger(RyaMongoGeoDirectExample.class);
+
+ //
+ // Connection configuration parameters
+ //
+
+ private static final boolean PRINT_QUERIES = true;
+ private static final String MONGO_DB = "rya";
+ private static final String MONGO_COLL_PREFIX = "rya_";
+ private static final boolean USE_MOCK = true;
+ private static final boolean USE_INFER = true;
+ private static final String MONGO_INSTANCE_URL = "localhost";
+ private static final String MONGO_INSTANCE_PORT = "27017";
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = getConf();
+ conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, PRINT_QUERIES);
+ conf.setBoolean(OptionalConfigUtils.USE_GEO, true); // Note also the use of "GeoRyaSailFactory" below.
+ conf.setStrings(OptionalConfigUtils.GEO_PREDICATES_LIST, "http://www.opengis.net/ont/geosparql#asWKT"); // Note also the use of "GeoRyaSailFactory" below.
+
+ SailRepository repository = null;
+ SailRepositoryConnection conn = null;
+ try {
+ log.info("Connecting to Indexing Sail Repository.");
+ Sail sail = GeoRyaSailFactory.getInstance(conf);
+ repository = new SailRepository(sail);
+ conn = repository.getConnection();
+
+ long start = System.currentTimeMillis();
+ testAddPointAndWithinSearch(conn); // uses geospatial features
+
+ log.info("TIME: " + (System.currentTimeMillis() - start) / 1000.);
+ } finally {
+ log.info("Shutting down");
+ closeQuietly(conn);
+ closeQuietly(repository);
+ if (mock != null) {
+ mock.shutdown();
+ }
+ MongoConnectorFactory.closeMongoClient();
+ }
+ }
+/**
+ * Try out some geospatial data and queries
+ * @param repository
+ */
+ private static void testAddPointAndWithinSearch(SailRepositoryConnection conn) throws Exception {
+
+ String update = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "INSERT DATA { " //
+ + " <urn:feature> a geo:Feature ; " //
+ + " geo:hasGeometry [ " //
+ + " a geo:Point ; " //
+ + " geo:asWKT \"Point(-77.03524 38.889468)\"^^geo:wktLiteral "//
+ + " ] . " //
+ + "}";
+
+ Update u = conn.prepareUpdate(QueryLanguage.SPARQL, update);
+ u.execute();
+
+ String queryString;
+ TupleQuery tupleQuery;
+ CountingResultHandler tupleHandler;
+
+ // ring containing point
+ queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "SELECT ?feature ?point ?wkt " //
+ + "{" //
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -77 39, -77 38, -78 38, -78 39))\"^^geo:wktLiteral)) " //
+ + "}";//
+ tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+
+ tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("Result count -- ring containing point: " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() >= 1); // may see points from during previous runs
+
+ // ring outside point
+ queryString = "PREFIX geo: <http://www.opengis.net/ont/geosparql#> "//
+ + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> "//
+ + "SELECT ?feature ?point ?wkt " //
+ + "{" //
+ + " ?feature a geo:Feature . "//
+ + " ?feature geo:hasGeometry ?point . "//
+ + " ?point a geo:Point . "//
+ + " ?point geo:asWKT ?wkt . "//
+ + " FILTER(geof:sfWithin(?wkt, \"POLYGON((-77 39, -76 39, -76 38, -77 38, -77 39))\"^^geo:wktLiteral)) " //
+ + "}";//
+ tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString);
+
+ tupleHandler = new CountingResultHandler();
+ tupleQuery.evaluate(tupleHandler);
+ log.info("Result count -- ring outside point: " + tupleHandler.getCount());
+ Validate.isTrue(tupleHandler.getCount() == 0);
+ }
+
+ private static void closeQuietly(SailRepository repository) {
+ if (repository != null) {
+ try {
+ repository.shutDown();
+ } catch (RepositoryException e) {
+ // quietly absorb this exception
+ }
+ }
+ }
+
+ private static void closeQuietly(SailRepositoryConnection conn) {
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (RepositoryException e) {
+ // quietly absorb this exception
+ }
+ }
+ }
+
+ private static MockMongoFactory mock = null;
+ private static Configuration getConf() throws IOException {
+
+ MongoDBIndexingConfigBuilder builder = MongoIndexingConfiguration.builder()
+ .setUseMockMongo(USE_MOCK).setUseInference(USE_INFER).setAuths("U");
+
+ if (USE_MOCK) {
+ mock = MockMongoFactory.newFactory();
+ MongoClient c = mock.newMongoClient();
+ ServerAddress address = c.getAddress();
+ String url = address.getHost();
+ String port = Integer.toString(address.getPort());
+ c.close();
+ builder.setMongoHost(url).setMongoPort(port);
+ } else {
+ // User name and password must be filled in:
+ builder = builder.setMongoUser("fill this in")
+ .setMongoPassword("fill this in")
+ .setMongoHost(MONGO_INSTANCE_URL)
+ .setMongoPort(MONGO_INSTANCE_PORT);
+ }
+
+ return builder.setMongoDBName(MONGO_DB)
+ .setMongoCollectionPrefix(MONGO_COLL_PREFIX)
+ .setUseMongoFreetextIndex(true)
+ .setMongoFreeTextPredicates(RDFS.LABEL.stringValue()).build();
+
+ }
+
+
+ private static class CountingResultHandler implements TupleQueryResultHandler {
+ private int count = 0;
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void startQueryResult(List<String> arg0) throws TupleQueryResultHandlerException {
+ }
+
+ @Override
+ public void handleSolution(BindingSet arg0) throws TupleQueryResultHandlerException {
+ count++;
+ System.out.println(arg0);
+ }
+
+ @Override
+ public void endQueryResult() throws TupleQueryResultHandlerException {
+ }
+
+ @Override
+ public void handleBoolean(boolean arg0) throws QueryResultHandlerException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void handleLinks(List<String> arg0) throws QueryResultHandlerException {
+ // TODO Auto-generated method stub
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
new file mode 100644
index 0000000..926f357
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy;
+import org.bson.Document;
+import org.joda.time.DateTime;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.CoordinateList;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.LinearRing;
+
+public class EventDocumentConverter implements DocumentConverter<Event>{
+ public static final String SUBJECT = "_id";
+ public static final String GEO_KEY = "location";
+ public static final String INTERVAL_START = "start";
+ public static final String INTERVAL_END = "end";
+ public static final String INSTANT = "instant";
+
+ private final GeoMongoDBStorageStrategy geoAdapter = new GeoMongoDBStorageStrategy(0.0);
+
+ @Override
+ public Document toDocument(final Event event) {
+ requireNonNull(event);
+
+ final Document doc = new Document();
+ doc.append(SUBJECT, event.getSubject().getData());
+
+ if(event.getGeometry().isPresent()) {
+ if (event.getGeometry().get().getNumPoints() > 1) {
+ doc.append(GEO_KEY, geoAdapter.getCorrespondingPoints(event.getGeometry().get()));
+ } else {
+ doc.append(GEO_KEY, geoAdapter.getDBPoint(event.getGeometry().get()));
+ }
+ }
+ if(event.isInstant()) {
+ if(event.getInstant().isPresent()) {
+ doc.append(INSTANT, event.getInstant().get().getAsDateTime().toDate());
+ }
+ } else {
+ if(event.getInterval().isPresent()) {
+ doc.append(INTERVAL_START, event.getInterval().get().getHasBeginning().getAsDateTime().toDate());
+ doc.append(INTERVAL_END, event.getInterval().get().getHasEnd().getAsDateTime().toDate());
+ }
+ }
+
+ return doc;
+ }
+
+ @Override
+ public Event fromDocument(final Document document) throws DocumentConverterException {
+ requireNonNull(document);
+
+ final boolean isInstant;
+
+ // Preconditions.
+ if(!document.containsKey(SUBJECT)) {
+ throw new DocumentConverterException("Could not convert document '" + document +
+ "' because its '" + SUBJECT + "' field is missing.");
+ }
+
+ if(document.containsKey(INSTANT)) {
+ isInstant = true;
+ } else {
+ isInstant = false;
+ }
+
+ final String subject = document.getString(SUBJECT);
+
+ final Event.Builder builder = new Event.Builder()
+ .setSubject(new RyaURI(subject));
+
+ if(document.containsKey(GEO_KEY)) {
+ final Document geoObj = (Document) document.get(GEO_KEY);
+ final GeometryFactory geoFact = new GeometryFactory();
+ final String typeString = (String) geoObj.get("type");
+ final CoordinateList coords = new CoordinateList();
+ final Geometry geo;
+ if (typeString.equals("Point")) {
+ final List<Double> point = (List<Double>) geoObj.get("coordinates");
+ final Coordinate coord = new Coordinate(point.get(0), point.get(1));
+ geo = geoFact.createPoint(coord);
+ } else if (typeString.equals("LineString")) {
+ final List<List<Double>> pointsList = (List<List<Double>>) geoObj.get("coordinates");
+ for (final List<Double> point : pointsList) {
+ coords.add(new Coordinate(point.get(0), point.get(1)));
+ }
+ geo = geoFact.createLineString(coords.toCoordinateArray());
+ } else {
+ final List<List<List<Double>>> pointsList = (List<List<List<Double>>>) geoObj.get("coordinates");
+ if(pointsList.size() == 1) {
+ final List<List<Double>> poly = pointsList.get(0);
+ for (final List<Double> point : poly) {
+ coords.add(new Coordinate(point.get(0), point.get(1)));
+ }
+ geo = geoFact.createPolygon(coords.toCoordinateArray());
+ } else {
+ final List<List<Double>> first = pointsList.get(0);
+ final CoordinateList shellCoords = new CoordinateList();
+ for (final List<Double> point : pointsList.get(0)) {
+ shellCoords.add(new Coordinate(point.get(0), point.get(1)));
+ }
+ final LinearRing shell = geoFact.createLinearRing(shellCoords.toCoordinateArray());
+
+ final List<List<List<Double>>> holesPoints = pointsList.subList(1, pointsList.size() - 1);
+ final LinearRing[] holes = new LinearRing[holesPoints.size()];
+ for(int ii = 0; ii < holes.length; ii++) {
+ final List<List<Double>> holePoints = holesPoints.get(ii);
+ final CoordinateList shells = new CoordinateList();
+ for (final List<Double> point : pointsList.get(0)) {
+ shells.add(new Coordinate(point.get(0), point.get(1)));
+ }
+ holes[ii] = geoFact.createLinearRing(shells.toCoordinateArray());
+ }
+ geo = geoFact.createPolygon(shell,
+ holes);
+ }
+ }
+ builder.setGeometry(geo);
+ }
+
+ if(isInstant) {
+ //we already know the key exists
+ final Date date = (Date) document.get(INSTANT);
+ final DateTime dt = new DateTime(date.getTime());
+ final TemporalInstant instant = new TemporalInstantRfc3339(dt);
+ builder.setTemporalInstant(instant);
+ } else if(document.containsKey(INTERVAL_START)){
+ Date date = (Date) document.get(INTERVAL_START);
+ DateTime dt = new DateTime(date.getTime());
+ final TemporalInstant begining = new TemporalInstantRfc3339(dt);
+
+ date = (Date) document.get(INTERVAL_END);
+ dt = new DateTime(date.getTime());
+ final TemporalInstant end = new TemporalInstantRfc3339(dt);
+
+ final TemporalInterval interval = new TemporalInterval(begining, end);
+ builder.setTemporalInterval(interval);
+ }
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
new file mode 100644
index 0000000..1c62407
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.geotemporal.model.Event;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage;
+import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventStorageException;
+import org.apache.rya.indexing.mongodb.update.MongoDocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Performs update operations over an {@link EventStorage}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class EventUpdater implements MongoDocumentUpdater<RyaURI, Event>{
+ private final EventStorage events;
+
+ /**
+ * Constructs an instance of {@link EventUpdater}
+ *
+ * @param events - The storage this updater operates over. (not null)
+ */
+ public EventUpdater(final EventStorage events) {
+ this.events = requireNonNull(events);
+ }
+
+ @Override
+ public Optional<Event> getOld(final RyaURI key) throws EventStorageException {
+ try {
+ return events.get(key);
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void create(final Event newObj) throws EventStorageException {
+ try {
+ events.create(newObj);
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void update(final Event old, final Event updated) throws EventStorageException {
+ try {
+ events.update(old, updated);
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+
+ public void delete(final Event event) throws EventStorageException {
+ try {
+ events.delete(event.getSubject());
+ } catch (final ObjectStorageException e) {
+ throw new EventStorageException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
----------------------------------------------------------------------
diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
new file mode 100644
index 0000000..7bb1c1f
--- /dev/null
+++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.geotemporal.mongo;
+
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.EQUALS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.INTERSECTS;
+import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.WITHIN;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INSTANT;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_END;
+import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_START;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.geo.GeoParseUtils;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.GeoPolicy;
+import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.TemporalPolicy;
+import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy;
+import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQuery;
+import org.apache.rya.indexing.mongodb.geo.GmlParser;
+import org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy;
+import org.joda.time.DateTime;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.query.MalformedQueryException;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+import com.vividsolutions.jts.geom.Geometry;
+import com.vividsolutions.jts.io.ParseException;
+import com.vividsolutions.jts.io.WKTReader;
+
+import jline.internal.Log;
+
+/**
+ * Storage adapter for serializing Geo Temporal statements into mongo objects.
+ * This includes adapting the {@link IndexingExpr}s for the GeoTemporal indexer.
+ */
+public class GeoTemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy {
+ private static final Logger LOG = Logger.getLogger(GeoTemporalMongoDBStorageStrategy.class);
+ private static final String GEO_KEY = "location";
+ private static final String TIME_KEY = "time";
+ private final TemporalMongoDBStorageStrategy temporalStrategy;
+ private final GeoMongoDBStorageStrategy geoStrategy;
+
+ public GeoTemporalMongoDBStorageStrategy() {
+ geoStrategy = new GeoMongoDBStorageStrategy(0.0);
+ temporalStrategy = new TemporalMongoDBStorageStrategy();
+ }
+
+ @Override
+ public void createIndices(final DBCollection coll){
+ coll.createIndex(new BasicDBObject(GEO_KEY, "2dsphere"));
+ coll.createIndex(TIME_KEY);
+ }
+
+ public DBObject getFilterQuery(final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters) throws GeoTemporalIndexException {
+ final QueryBuilder builder = QueryBuilder.start();
+
+ if(!geoFilters.isEmpty()) {
+ final DBObject[] geo = getGeoObjs(geoFilters);
+ if(!temporalFilters.isEmpty()) {
+ final DBObject[] temporal = getTemporalObjs(temporalFilters);
+ builder.and(oneOrAnd(geo), oneOrAnd(temporal));
+ return builder.get();
+ } else {
+ return oneOrAnd(geo);
+ }
+ } else if(!temporalFilters.isEmpty()) {
+ final DBObject[] temporal = getTemporalObjs(temporalFilters);
+ return oneOrAnd(temporal);
+ } else {
+ return builder.get();
+ }
+ }
+
+ private DBObject oneOrAnd(final DBObject[] dbos) {
+ if(dbos.length == 1) {
+ return dbos[0];
+ }
+ return QueryBuilder.start()
+ .and(dbos)
+ .get();
+ }
+
+ @Override
+ public DBObject serialize(final RyaStatement ryaStatement) {
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start("_id", ryaStatement.getSubject().hashCode());
+ final URI obj = ryaStatement.getObject().getDataType();
+
+
+ if(obj.equals(GeoConstants.GEO_AS_WKT) || obj.equals(GeoConstants.GEO_AS_GML) ||
+ obj.equals(GeoConstants.XMLSCHEMA_OGC_GML) || obj.equals(GeoConstants.XMLSCHEMA_OGC_WKT)) {
+ try {
+ final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement);
+ final Geometry geo = GeoParseUtils.getGeometry(statement, new GmlParser());
+ if (geo.getNumPoints() > 1) {
+ builder.add(GEO_KEY, geoStrategy.getCorrespondingPoints(geo));
+ } else {
+ builder.add(GEO_KEY, geoStrategy.getDBPoint(geo));
+ }
+ } catch (final ParseException e) {
+ LOG.error("Could not create geometry for statement " + ryaStatement, e);
+ return null;
+ }
+ } else {
+ builder.add(TIME_KEY, temporalStrategy.getTimeValue(ryaStatement.getObject().getData()));
+ }
+ return builder.get();
+ }
+
+ private DBObject[] getGeoObjs(final Collection<IndexingExpr> geoFilters) {
+ final List<DBObject> objs = new ArrayList<>();
+ geoFilters.forEach(filter -> {
+ final GeoPolicy policy = GeoPolicy.fromURI(filter.getFunction());
+ final WKTReader reader = new WKTReader();
+ final String geoStr = ((Value) filter.getArguments()[0]).stringValue();
+ try {
+ //This method is what is used in the GeoIndexer.
+ final Geometry geo = reader.read(geoStr);
+ objs.add(getGeoObject(geo, policy));
+ } catch (final GeoTemporalIndexException | UnsupportedOperationException | ParseException e) {
+ Log.error("Unable to parse '" + geoStr + "'.", e);
+ }
+ });
+ return objs.toArray(new DBObject[]{});
+ }
+
+ private DBObject[] getTemporalObjs(final Collection<IndexingExpr> temporalFilters) {
+ final List<DBObject> objs = new ArrayList<>();
+ temporalFilters.forEach(filter -> {
+ final TemporalPolicy policy = TemporalPolicy.fromURI(filter.getFunction());
+ final String timeStr = ((Value) filter.getArguments()[0]).stringValue();
+ final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(timeStr);
+ if(matcher.find()) {
+ final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(timeStr);
+ if(policy == TemporalPolicy.INSTANT_AFTER_INSTANT ||
+ policy == TemporalPolicy.INSTANT_BEFORE_INSTANT ||
+ policy == TemporalPolicy.INSTANT_EQUALS_INSTANT) {
+ if(interval == null) {
+ Log.error("Cannot perform temporal interval based queries on an instant.");
+ }
+ }
+ objs.add(getTemporalObject(interval, policy));
+ } else {
+ final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(timeStr));
+ if(policy != TemporalPolicy.INSTANT_AFTER_INSTANT &&
+ policy != TemporalPolicy.INSTANT_BEFORE_INSTANT &&
+ policy != TemporalPolicy.INSTANT_EQUALS_INSTANT) {
+ Log.error("Cannot perform temporal instant based queries on an interval.");
+ }
+ objs.add(getTemporalObject(instant, policy));
+ }
+ });
+ return objs.toArray(new DBObject[]{});
+ }
+
+ private DBObject getGeoObject (final Geometry geo, final GeoPolicy policy) throws GeoTemporalIndexException {
+ switch(policy) {
+ case CONTAINS:
+ throw new UnsupportedOperationException("Contains queries are not supported in Mongo DB.");
+ case CROSSES:
+ throw new UnsupportedOperationException("Crosses queries are not supported in Mongo DB.");
+ case DISJOINT:
+ throw new UnsupportedOperationException("Disjoint queries are not supported in Mongo DB.");
+ case EQUALS:
+ try {
+ return geoStrategy.getQuery(new GeoQuery(EQUALS, geo));
+ } catch (final MalformedQueryException e) {
+ throw new GeoTemporalIndexException(e.getMessage(), e);
+ }
+ case INTERSECTS:
+ try {
+ return geoStrategy.getQuery(new GeoQuery(INTERSECTS, geo));
+ } catch (final MalformedQueryException e) {
+ throw new GeoTemporalIndexException(e.getMessage(), e);
+ }
+ case OVERLAPS:
+ throw new UnsupportedOperationException("Overlaps queries are not supported in Mongo DB.");
+ case TOUCHES:
+ throw new UnsupportedOperationException("Touches queries are not supported in Mongo DB.");
+ case WITHIN:
+ try {
+ return geoStrategy.getQuery(new GeoQuery(WITHIN, geo));
+ } catch (final MalformedQueryException e) {
+ throw new GeoTemporalIndexException(e.getMessage(), e);
+ }
+ default:
+ return new BasicDBObject();
+ }
+ }
+
+ private DBObject getTemporalObject(final TemporalInstant instant, final TemporalPolicy policy) {
+ final DBObject temporalObj;
+ switch(policy) {
+ case INSTANT_AFTER_INSTANT:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .greaterThan(instant.getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_BEFORE_INSTANT:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .lessThan(instant.getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_EQUALS_INSTANT:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .is(instant.getAsDateTime().toDate())
+ .get();
+ break;
+ default:
+ temporalObj = new BasicDBObject();
+ }
+ return temporalObj;
+ }
+
+ private DBObject getTemporalObject(final TemporalInterval interval, final TemporalPolicy policy) {
+ final DBObject temporalObj;
+ switch(policy) {
+ case INSTANT_AFTER_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .greaterThan(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_BEFORE_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .lessThan(interval.getHasBeginning().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_END_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .is(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_IN_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .greaterThan(interval.getHasBeginning().getAsDateTime().toDate())
+ .lessThan(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INSTANT_START_INTERVAL:
+ temporalObj = QueryBuilder.start(INSTANT)
+ .is(interval.getHasBeginning().getAsDateTime().toDate())
+ .get();
+ break;
+ case INTERVAL_AFTER:
+ temporalObj = QueryBuilder.start(INTERVAL_START)
+ .greaterThan(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ case INTERVAL_BEFORE:
+ temporalObj = QueryBuilder.start(INTERVAL_END)
+ .lessThan(interval.getHasBeginning().getAsDateTime().toDate())
+ .get();
+ break;
+ case INTERVAL_EQUALS:
+ temporalObj = QueryBuilder.start(INTERVAL_START)
+ .is(interval.getHasBeginning().getAsDateTime().toDate())
+ .and(INTERVAL_END)
+ .is(interval.getHasEnd().getAsDateTime().toDate())
+ .get();
+ break;
+ default:
+ temporalObj = new BasicDBObject();
+ }
+ return temporalObj;
+ }
+}