You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2016/10/15 20:06:49 UTC
[29/69] [abbrv] [partial] incubator-rya git commit: RYA-198 Renaming
Files
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/SimpleMongoDBStorageStrategyTest.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/SimpleMongoDBStorageStrategyTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/SimpleMongoDBStorageStrategyTest.java
new file mode 100644
index 0000000..be5fdb7
--- /dev/null
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/SimpleMongoDBStorageStrategyTest.java
@@ -0,0 +1,85 @@
+package mvm.rya.mongodb;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.openrdf.model.vocabulary.XMLSchema.ANYURI;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.MongoException;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+
+public class SimpleMongoDBStorageStrategyTest {
+ private static final String SUBJECT = "http://subject.com";
+ private static final String PREDICATE = "http://temp.com";
+ private static final String OBJECT = "http://object.com";
+ private static final String CONTEXT = "http://context.com";
+
+ private static final RyaStatement testStatement;
+ private static final DBObject testDBO;
+ private final SimpleMongoDBStorageStrategy storageStrategy = new SimpleMongoDBStorageStrategy();
+
+ static {
+ final RyaStatementBuilder builder = new RyaStatementBuilder();
+ builder.setPredicate(new RyaURI(PREDICATE));
+ builder.setSubject(new RyaURI(SUBJECT));
+ builder.setObject(new RyaURI(OBJECT));
+ builder.setContext(new RyaURI(CONTEXT));
+ builder.setTimestamp(null);
+ testStatement = builder.build();
+
+ testDBO = new BasicDBObject();
+ testDBO.put("_id", "d5f8fea0e85300478da2c9b4e132c69502e21221");
+ testDBO.put("subject", SUBJECT);
+ testDBO.put("predicate", PREDICATE);
+ testDBO.put("object", OBJECT);
+ testDBO.put("objectType", ANYURI.stringValue());
+ testDBO.put("context", CONTEXT);
+ testDBO.put("insertTimestamp", null);
+ }
+
+ @Test
+ public void testSerializeStatementToDBO() throws RyaDAOException, MongoException, IOException {
+
+ final DBObject dbo = storageStrategy.serialize(testStatement);
+ assertEquals(testDBO, dbo);
+ }
+
+ @Test
+ public void testDeSerializeStatementToDBO() throws RyaDAOException, MongoException, IOException {
+ final RyaStatement statement = storageStrategy.deserializeDBObject(testDBO);
+ /**
+ * Since RyaStatement creates a timestamp using JVM time if the timestamp is null, we want to re-null it
+ * for this test. Timestamp is created at insert time by the Server, this test
+ * can be found in the RyaDAO.
+ */
+ statement.setTimestamp(null);
+ assertEquals(testStatement, statement);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
new file mode 100644
index 0000000..9faa595
--- /dev/null
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java
@@ -0,0 +1,296 @@
+package mvm.rya.mongodb.instance;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Date;
+
+import org.junit.Test;
+
+import com.google.common.base.Optional;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.util.JSON;
+
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
+import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails;
+import mvm.rya.api.instance.RyaDetails.GeoIndexDetails;
+import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import mvm.rya.api.instance.RyaDetails.ProspectorDetails;
+import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails;
+import mvm.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException;
+
+/**
+ * Tests the methods of {@link MongoDetailsAdapter}.
+ */
+public class MongoDetailsAdapterTest {
+
+ @Test
+ public void ryaDetailsToMongoTest() {
+ // Convert the Details into a Mongo DB OBject.
+ final RyaDetails details = RyaDetails.builder()
+ .setRyaInstanceName("test")
+ .setRyaVersion("1")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true))
+ .setGeoIndexDetails(new GeoIndexDetails(true))
+ .setPCJIndexDetails(
+ PCJIndexDetails.builder()
+ .setEnabled(true)
+ .setFluoDetails(new FluoDetails("fluo"))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj_0")
+ .setUpdateStrategy(PCJUpdateStrategy.BATCH)
+ .setLastUpdateTime(new Date(0L)))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj_1")
+ .setUpdateStrategy(PCJUpdateStrategy.BATCH)
+ .setLastUpdateTime(new Date(1L))))
+ .setTemporalIndexDetails(new TemporalIndexDetails(true))
+ .setFreeTextDetails(new FreeTextIndexDetails(true))
+ .setProspectorDetails(new ProspectorDetails(Optional.fromNullable(new Date(0L))))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.fromNullable(new Date(1L))))
+ .build();
+
+ final BasicDBObject actual = MongoDetailsAdapter.toDBObject(details);
+
+ // Ensure it matches the expected object.
+ final DBObject expected = (DBObject) JSON.parse(
+ "{ "
+ + "instanceName : \"test\","
+ + "version : \"1\","
+ + "entityCentricDetails : true,"
+ + "geoDetails : true,"
+ + "pcjDetails : {"
+ + "enabled : true ,"
+ + "fluoName : \"fluo\","
+ + "pcjs : [ "
+ + "{"
+ + "id : \"pcj_0\","
+ + "updateStrategy : \"BATCH\","
+ + "lastUpdate : { $date : \"1970-01-01T00:00:00.000Z\"}"
+ + "},"
+ + "{"
+ + "id : \"pcj_1\","
+ + "updateStrategy : \"BATCH\","
+ + "lastUpdate : { $date : \"1970-01-01T00:00:00.001Z\"}"
+ + "}]"
+ + "},"
+ + "temporalDetails : true,"
+ + "freeTextDetails : true,"
+ + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
+ + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+ + "}"
+ );
+
+ assertEquals(expected.toString(), actual.toString());
+ }
+
+ @Test
+ public void mongoToRyaDetailsTest() throws MalformedRyaDetailsException {
+ // Convert the Mongo object into a RyaDetails.
+ final BasicDBObject mongo = (BasicDBObject) JSON.parse(
+ "{ "
+ + "instanceName : \"test\","
+ + "version : \"1\","
+ + "entityCentricDetails : true,"
+ + "geoDetails : true,"
+ + "pcjDetails : {"
+ + "enabled : true ,"
+ + "fluoName : \"fluo\","
+ + "pcjs : [ "
+ + "{"
+ + "id : \"pcj_0\","
+ + "updateStrategy : \"BATCH\","
+ + "lastUpdate : { $date : \"1970-01-01T00:00:00.000Z\"}"
+ + "},"
+ + "{"
+ + "id : \"pcj_1\","
+ + "updateStrategy : \"BATCH\","
+ + "lastUpdate : { $date : \"1970-01-01T00:00:00.001Z\"}"
+ + "}]"
+ + "},"
+ + "temporalDetails : true,"
+ + "freeTextDetails : true,"
+ + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"},"
+ + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}"
+ + "}"
+ );
+
+ final RyaDetails actual = MongoDetailsAdapter.toRyaDetails(mongo);
+
+ // Ensure it matches the expected object.
+ final RyaDetails expected = RyaDetails.builder()
+ .setRyaInstanceName("test")
+ .setRyaVersion("1")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true))
+ .setGeoIndexDetails(new GeoIndexDetails(true))
+ .setPCJIndexDetails(
+ PCJIndexDetails.builder()
+ .setEnabled(true)
+ .setFluoDetails(new FluoDetails("fluo"))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj_0")
+ .setUpdateStrategy(PCJUpdateStrategy.BATCH)
+ .setLastUpdateTime(new Date(0L)))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj_1")
+ .setUpdateStrategy(PCJUpdateStrategy.BATCH)
+ .setLastUpdateTime(new Date(1L))))
+ .setTemporalIndexDetails(new TemporalIndexDetails(true))
+ .setFreeTextDetails(new FreeTextIndexDetails(true))
+ .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L))))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L))))
+ .build();
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void absentOptionalToRyaDetailsTest() throws MalformedRyaDetailsException {
+ // Convert the Mongo object into a RyaDetails.
+ final BasicDBObject mongo = (BasicDBObject) JSON.parse(
+ "{ "
+ + "instanceName : \"test\","
+ + "version : \"1\","
+ + "entityCentricDetails : true,"
+ + "geoDetails : false,"
+ + "pcjDetails : {"
+ + "enabled : false,"
+ + "fluoName : \"fluo\","
+ + "pcjs : [ "
+ + "{"
+ + "id : \"pcj_1\","
+ + "}"
+ + "]"
+ + "},"
+ + "temporalDetails : false,"
+ + "freeTextDetails : true,"
+ + "prospectorDetails : null,"
+ + "joinSelectivitiyDetails : null"
+ + "}"
+ );
+ final RyaDetails actual = MongoDetailsAdapter.toRyaDetails(mongo);
+
+ // Ensure it matches the expected object.
+ final RyaDetails expected = RyaDetails.builder()
+ .setRyaInstanceName("test")
+ .setRyaVersion("1")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true))
+ .setGeoIndexDetails(new GeoIndexDetails(false))
+ .setPCJIndexDetails(
+ PCJIndexDetails.builder()
+ .setEnabled(false)
+ .setFluoDetails(new FluoDetails("fluo"))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj_1")
+ .setLastUpdateTime(null)))
+ .setTemporalIndexDetails(new TemporalIndexDetails(false))
+ .setFreeTextDetails(new FreeTextIndexDetails(true))
+ .setProspectorDetails(new ProspectorDetails(Optional.<Date>absent()))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>absent()))
+ .build();
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void absentOptionalToMongoTest() {
+ // Convert the Details into a Mongo DB OBject.
+ final RyaDetails details = RyaDetails.builder()
+ .setRyaInstanceName("test")
+ .setRyaVersion("1")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true))
+ .setGeoIndexDetails(new GeoIndexDetails(false))
+ .setPCJIndexDetails(
+ PCJIndexDetails.builder()
+ .setEnabled(true)
+ .setFluoDetails(new FluoDetails("fluo")))
+ .setTemporalIndexDetails(new TemporalIndexDetails(false))
+ .setFreeTextDetails(new FreeTextIndexDetails(true))
+ .setProspectorDetails(new ProspectorDetails(Optional.<Date>absent()))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>absent()))
+ .build();
+
+ final DBObject actual = MongoDetailsAdapter.toDBObject(details);
+
+ // Ensure it matches the expected object.
+ final BasicDBObject expected = (BasicDBObject) JSON.parse(
+ "{ "
+ + "instanceName : \"test\","
+ + "version : \"1\","
+ + "entityCentricDetails : true,"
+ + "geoDetails : false,"
+ + "pcjDetails : {"
+ + "enabled : true,"
+ + "fluoName : \"fluo\","
+ + "pcjs : [ ]"
+ + "},"
+ + "temporalDetails : false,"
+ + "freeTextDetails : true"
+ + "}"
+ );
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void toDBObject_pcjDetails() {
+ final PCJDetails details = PCJDetails.builder()
+ .setId("pcjId")
+ .setLastUpdateTime( new Date() )
+ .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL )
+ .build();
+
+ // Convert it into a Mongo DB Object.
+ final BasicDBObject dbo = (BasicDBObject) MongoDetailsAdapter.toDBObject(details);
+
+ // Convert the dbo back into the original object.
+ final PCJDetails restored = MongoDetailsAdapter.toPCJDetails(dbo).build();
+
+ // Ensure the restored value matches the original.
+ assertEquals(details, restored);
+ }
+
+ @Test
+ public void toDBObject_pcjDetails_missing_optionals() {
+ final PCJDetails details = PCJDetails.builder()
+ .setId("pcjId")
+ .build();
+
+ // Convert it into a Mongo DB Object.
+ final BasicDBObject dbo = (BasicDBObject) MongoDetailsAdapter.toDBObject(details);
+
+ // Convert the dbo back into the original object.
+ final PCJDetails restored = MongoDetailsAdapter.toPCJDetails(dbo).build();
+
+ // Ensure the restored value matches the original.
+ assertEquals(details, restored);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java
new file mode 100644
index 0000000..2ce2e93
--- /dev/null
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java
@@ -0,0 +1,307 @@
+package mvm.rya.mongodb.instance;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Optional;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoException;
+
+import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory;
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
+import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails;
+import mvm.rya.api.instance.RyaDetails.GeoIndexDetails;
+import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import mvm.rya.api.instance.RyaDetails.ProspectorDetails;
+import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails;
+import mvm.rya.api.instance.RyaDetailsRepository;
+import mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException;
+import mvm.rya.api.instance.RyaDetailsRepository.ConcurrentUpdateException;
+import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+
+/**
+ * Tests the methods of {@link AccumuloRyaDetailsRepository} by using a {@link MiniAccumuloCluster}.
+ */
+public class MongoRyaDetailsRepositoryIT {
+
+ private static MongoClient client = null;
+
+ @BeforeClass
+ public static void startMiniAccumulo() throws MongoException, IOException {
+ final MongodForTestsFactory mongoFactory = new MongodForTestsFactory();
+ client = mongoFactory.newMongo();
+ }
+
+ @Before
+ public void clearLastTest() {
+ client.dropDatabase("testInstance");
+ }
+
+ @AfterClass
+ public static void stopMiniAccumulo() throws IOException, InterruptedException {
+ client.close();
+ }
+
+ @Test
+ public void initializeAndGet() throws AlreadyInitializedException, RyaDetailsRepositoryException {
+ final String instanceName = "testInstance";
+
+ // Create the metadata object the repository will be initialized with.
+ final RyaDetails details = RyaDetails.builder()
+ .setRyaInstanceName(instanceName)
+ .setRyaVersion("1.2.3.4")
+ .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) )
+ .setGeoIndexDetails( new GeoIndexDetails(true) )
+ .setTemporalIndexDetails( new TemporalIndexDetails(true) )
+ .setFreeTextDetails( new FreeTextIndexDetails(true) )
+ .setPCJIndexDetails(
+ PCJIndexDetails.builder()
+ .setEnabled(true)
+ .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") )
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 1")
+ .setUpdateStrategy(PCJUpdateStrategy.BATCH)
+ .setLastUpdateTime( new Date() ))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 2")))
+ .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
+ .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .build();
+
+ // Setup the repository that will be tested using a mock instance of MongoDB.
+ final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, instanceName);
+
+ // Initialize the repository
+ repo.initialize(details);
+
+ // Fetch the stored details.
+ final RyaDetails stored = repo.getRyaInstanceDetails();
+
+ // Ensure the fetched object is equivalent to what was stored.
+ assertEquals(details, stored);
+ }
+
+ @Test(expected = AlreadyInitializedException.class)
+ public void initialize_alreadyInitialized() throws AlreadyInitializedException, RyaDetailsRepositoryException {
+ final String instanceName = "testInstance";
+
+ // Create the metadata object the repository will be initialized with.
+ final RyaDetails details = RyaDetails.builder()
+ .setRyaInstanceName(instanceName)
+ .setRyaVersion("1.2.3.4")
+ .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) )
+ .setGeoIndexDetails( new GeoIndexDetails(true) )
+ .setTemporalIndexDetails( new TemporalIndexDetails(true) )
+ .setFreeTextDetails( new FreeTextIndexDetails(true) )
+ .setPCJIndexDetails(
+ PCJIndexDetails.builder()
+ .setEnabled(true)
+ .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") )
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 1")
+ .setUpdateStrategy(PCJUpdateStrategy.BATCH)
+ .setLastUpdateTime( new Date() ))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 2")))
+ .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
+ .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .build();
+
+ // Setup the repository that will be tested using a mock instance of MongoDB.
+ final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, instanceName);
+
+ // Initialize the repository
+ repo.initialize(details);
+
+ // Initialize it again.
+ repo.initialize(details);
+ }
+
+ @Test(expected = NotInitializedException.class)
+ public void getRyaInstance_notInitialized() throws NotInitializedException, RyaDetailsRepositoryException {
+ // Setup the repository that will be tested using a mock instance of Accumulo.
+ final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance");
+
+ // Try to fetch the details from the uninitialized repository.
+ repo.getRyaInstanceDetails();
+ }
+
+ @Test
+ public void isInitialized_true() throws AlreadyInitializedException, RyaDetailsRepositoryException {
+ final String instanceName = "testInstance";
+
+ // Create the metadata object the repository will be initialized with.
+ final RyaDetails details = RyaDetails.builder()
+ .setRyaInstanceName(instanceName)
+ .setRyaVersion("1.2.3.4")
+ .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) )
+ .setGeoIndexDetails( new GeoIndexDetails(true) )
+ .setTemporalIndexDetails( new TemporalIndexDetails(true) )
+ .setFreeTextDetails( new FreeTextIndexDetails(true) )
+ .setPCJIndexDetails(
+ PCJIndexDetails.builder()
+ .setEnabled(true)
+ .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") )
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 1")
+ .setUpdateStrategy(PCJUpdateStrategy.BATCH)
+ .setLastUpdateTime( new Date() ))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 2")))
+ .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
+ .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .build();
+
+ // Setup the repository that will be tested using a mock instance of MongoDB.
+ final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance");
+
+ // Initialize the repository
+ repo.initialize(details);
+
+ // Ensure the repository reports that it has been initialized.
+ assertTrue( repo.isInitialized() );
+ }
+
+ @Test
+ public void isInitialized_false() throws RyaDetailsRepositoryException {
+ // Setup the repository that will be tested using a mock instance of MongoDB.
+ final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance");
+
+ // Ensure the repository reports that is has not been initialized.
+ assertFalse( repo.isInitialized() );
+ }
+
+ @Test
+ public void update() throws AlreadyInitializedException, RyaDetailsRepositoryException {
+ final String instanceName = "testInstance";
+
+ // Create the metadata object the repository will be initialized with.
+ final RyaDetails details = RyaDetails.builder()
+ .setRyaInstanceName(instanceName)
+ .setRyaVersion("1.2.3.4")
+ .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) )
+ .setGeoIndexDetails( new GeoIndexDetails(true) )
+ .setTemporalIndexDetails( new TemporalIndexDetails(true) )
+ .setFreeTextDetails( new FreeTextIndexDetails(true) )
+ .setPCJIndexDetails(
+ PCJIndexDetails.builder()
+ .setEnabled(true)
+ .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") )
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 1")
+ .setUpdateStrategy(PCJUpdateStrategy.BATCH)
+ .setLastUpdateTime( new Date() ))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 2")))
+ .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
+ .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .build();
+
+ // Setup the repository that will be tested using a mock instance of MongoDB.
+ final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance");
+
+ // Initialize the repository
+ repo.initialize(details);
+
+ // Create a new state for the details.
+ final RyaDetails updated = new RyaDetails.Builder( details )
+ .setGeoIndexDetails( new GeoIndexDetails(false) )
+ .build();
+
+ // Execute the update.
+ repo.update(details, updated);
+
+ // Show the new state that is stored matches the updated state.
+ final RyaDetails fetched = repo.getRyaInstanceDetails();
+ assertEquals(updated, fetched);
+ }
+
+ @Test(expected = ConcurrentUpdateException.class)
+ public void update_outOfDate() throws AlreadyInitializedException, RyaDetailsRepositoryException {
+ final String instanceName = "testInstance";
+
+ // Create the metadata object the repository will be initialized with.
+ final RyaDetails details = RyaDetails.builder()
+ .setRyaInstanceName(instanceName)
+ .setRyaVersion("1.2.3.4")
+ .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) )
+ .setGeoIndexDetails( new GeoIndexDetails(true) )
+ .setTemporalIndexDetails( new TemporalIndexDetails(true) )
+ .setFreeTextDetails( new FreeTextIndexDetails(true) )
+ .setPCJIndexDetails(
+ PCJIndexDetails.builder()
+ .setEnabled(true)
+ .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") )
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 1")
+ .setUpdateStrategy(PCJUpdateStrategy.BATCH)
+ .setLastUpdateTime( new Date() ))
+ .addPCJDetails(
+ PCJDetails.builder()
+ .setId("pcj 2")))
+ .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) )
+ .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) )
+ .build();
+
+ // Setup the repository that will be tested using a mock instance of MongoDB.
+ final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance");
+
+ // Initialize the repository
+ repo.initialize(details);
+
+ // Create a new state for the details.
+ final RyaDetails wrongOriginal = new RyaDetails.Builder( details )
+ .setTemporalIndexDetails( new TemporalIndexDetails(false) )
+ .build();
+
+ final RyaDetails updated = new RyaDetails.Builder( details )
+ .setGeoIndexDetails( new GeoIndexDetails(false) )
+ .build();
+
+ // Try to execute the update where the old state is not the currently stored state.
+ repo.update(wrongOriginal, updated);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java
deleted file mode 100644
index fefd651..0000000
--- a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package mvm.rya.accumulo.documentIndex;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-public class DocIndexIteratorUtil {
-
-
-
- public static final String DOC_ID_INDEX_DELIM = "\u001D" + "\u001E";
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
deleted file mode 100644
index ad38b2b..0000000
--- a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
+++ /dev/null
@@ -1,850 +0,0 @@
-package mvm.rya.accumulo.documentIndex;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.data.ArrayByteSequence;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.util.TextUtil;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-/**
- * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of
- * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index.
- *
- * The table structure should have the following form:
- *
- * row: shardID, colfam: term, colqual: docID
- *
- * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The
- * result will have an empty column family, as follows:
- *
- * row: shardID, colfam: (empty), colqual: docID
- *
- * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
- *
- * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections over terms. Extending classes
- * should override the {@link TermSource#seekColfams} in their implementation's {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method.
- *
- * README.shard in docs/examples shows an example of using the IntersectingIterator.
- */
-public class DocumentIndexIntersectingIterator implements SortedKeyValueIterator<Key,Value> {
-
-
-
-
- protected Text nullText = new Text();
-
- protected Text getRow(Key key) {
- return key.getRow();
- }
-
- protected Text getTerm(Key key) {
- return key.getColumnFamily();
- }
-
- protected Text getTermCond(Key key) {
- return key.getColumnQualifier();
- }
-
- protected Key buildKey(Text row, TextColumn column) {
- return new Key(row, (column.getColumnFamily() == null) ? nullText: column.getColumnFamily(), column.getColumnQualifier());
- }
-
- protected Key buildKey(Text row, Text term) {
- return new Key(row, (term == null) ? nullText : term);
- }
-
- protected Key buildKey(Text row, Text term, Text termCond) {
- return new Key(row, (term == null) ? nullText : term, termCond);
- }
-
- protected Key buildFollowRowKey(Key key, Text term, Text termCond) {
- return new Key(getRow(key.followingKey(PartialKey.ROW)),(term == null) ? nullText : term, termCond);
- }
-
- protected static final Logger log = Logger.getLogger(DocumentIndexIntersectingIterator.class);
-
- public static class TermSource {
- public SortedKeyValueIterator<Key, Value> iter;
- public Text term;
- public Text termCond;
- public Collection<ByteSequence> seekColfams;
- public TextColumn column;
- public boolean isPrefix;
- public Key top ;
- public Key next ;
- public Text currentCQ;
- private boolean seeked = false;
-
- public TermSource(TermSource other) {
-
- this.iter = other.iter;
- this.term = other.term;
- this.termCond = other.termCond;
- this.seekColfams = other.seekColfams;
- this.column = other.column;
- this.top = other.top;
- this.next = other.next;
- this.currentCQ = other.currentCQ;
- this.isPrefix = other.isPrefix;
- }
-
-
- public TermSource(SortedKeyValueIterator<Key, Value> iter, TextColumn column) {
-
- this.iter = iter;
- this.column = column;
- this.term = column.getColumnFamily();
- this.termCond = column.getColumnQualifier();
- this.currentCQ = new Text(emptyByteArray);
- this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term
- .getBytes(), 0, term.getLength()));
-
- }
-
-
-
- public void seek(Range r) throws IOException {
-
- if (seeked) {
-
- if (next != null && !r.beforeStartKey(next)) {
- if (next.getColumnFamily().equals(term)) {
- this.updateTop();
- }
- } else if (iter.hasTop()) {
- iter.seek(r, seekColfams, true);
- this.updateTopNext();
- } else {
- top = null;
- next = null;
-
- }
- } else {
-
- iter.seek(r, seekColfams, true);
- this.updateTopNext();
- seeked = true;
- }
-
- }
-
-
- public void next() throws IOException {
-
- this.updateTop();
- }
-
- public void updateTop() throws IOException {
-
- top = next;
- if (next != null) {
- iter.next();
- if (iter.hasTop()) {
- next = iter.getTopKey();
- } else {
- next = null;
- }
- }
-
- }
-
- public void updateTopNext() throws IOException {
-
- if (iter.hasTop()) {
- top = iter.getTopKey();
- } else {
- top = null;
- next = null;
- return;
- }
-
- iter.next();
-
- if(iter.hasTop()) {
- next = iter.getTopKey();
- } else {
- next = null;
- }
- }
-
- public boolean hasTop() {
- return top != null;
- }
-
-
- public String getTermString() {
- return (this.term == null) ? new String("Iterator") : this.term.toString();
- }
- }
-
- TermSource[] sources;
- int sourcesCount = 0;
- Range overallRange;
-
- // query-time settings
- protected Text currentRow = null;
- protected Text currentTermCond = new Text(emptyByteArray);
- static final byte[] emptyByteArray = new byte[0];
-
- protected Key topKey = null;
- protected Value value = new Value(emptyByteArray);
- protected String ctxt = null;
- protected boolean hasContext = false;
- protected boolean termCondSet = false;
-
- public DocumentIndexIntersectingIterator() {}
-
- @Override
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- //log.info("Calling deep copy on " + this);
- return new DocumentIndexIntersectingIterator(this, env);
- }
-
- private DocumentIndexIntersectingIterator(DocumentIndexIntersectingIterator other, IteratorEnvironment env) {
- if (other.sources != null) {
- sourcesCount = other.sourcesCount;
- sources = new TermSource[sourcesCount];
- for (int i = 0; i < sourcesCount; i++) {
- sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].column);
- }
- }
- }
-
- @Override
- public Key getTopKey() {
-
- return topKey;
- }
-
- @Override
- public Value getTopValue() {
- // we don't really care about values
- return value;
- }
-
- @Override
- public boolean hasTop() {
- return currentRow != null;
- }
-
- // precondition: currentRow is not null
- private boolean seekOneSource(int sourceID) throws IOException {
- // find the next key in the appropriate column family that is at or
- // beyond the cursor (currentRow, currentCQ)
- // advance the cursor if this source goes beyond it
- // return whether we advanced the cursor
-
- // within this loop progress must be made in one of the following forms:
- // - currentRow or currentCQ must be increased
- // - the given source must advance its iterator
- // this loop will end when any of the following criteria are met
- // - the iterator for the given source is pointing to the key
- // (currentRow, columnFamilies[sourceID], currentCQ)
- // - the given source is out of data and currentRow is set to null
- // - the given source has advanced beyond the endRow and currentRow is
- // set to null
- boolean advancedCursor = false;
-
-
-
-
-
- while (true) {
-
-// if(currentRow.toString().equals(s)) {
-// log.info("Source id is " + sourceID);
-// if (sources[sourceID].top != null) {
-// log.info("Top row is " + getRow(sources[sourceID].top));
-// log.info("Top cq is " + getTermCond(sources[sourceID].top));
-// }
-// if (sources[sourceID].next != null) {
-// log.info("Next row is " + getRow(sources[sourceID].next));
-// log.info("Next termCond is " + getTermCond(sources[sourceID].next));
-// }
-// }
-
- if (sources[sourceID].hasTop() == false) {
- currentRow = null;
- // setting currentRow to null counts as advancing the cursor
- return true;
- }
- // check if we're past the end key
- int endCompare = -1;
- // we should compare the row to the end of the range
-
- if (overallRange.getEndKey() != null) {
- endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].top.getRow());
- if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
- currentRow = null;
- // setting currentRow to null counts as advancing the cursor
- return true;
- }
- }
-
-
-
- int rowCompare = currentRow.compareTo(getRow(sources[sourceID].top));
- // check if this source is already at or beyond currentRow
- // if not, then seek to at least the current row
-
-
-
- if (rowCompare > 0) {
- // seek to at least the currentRow
- Key seekKey = buildKey(currentRow, sources[sourceID].term);
- sources[sourceID].seek(new Range(seekKey, true, null, false));
-
- continue;
- }
- // check if this source has gone beyond currentRow
- // if so, advance currentRow
- if (rowCompare < 0) {
- currentRow.set(getRow(sources[sourceID].top));
- //log.info("Current row is " + currentRow);
- advancedCursor = true;
- continue;
- }
- // we have verified that the current source is positioned in
- // currentRow
- // now we must make sure we're in the right columnFamily in the
- // current row
- // Note: Iterators are auto-magically set to the correct
- // columnFamily
-
- if (sources[sourceID].column.isValid()) {
-
- boolean isPrefix = false;
- boolean contextEqual = false;
- String tempContext = "";
-
- int termCompare;
-
- String[] cQ = getTermCond(sources[sourceID].top).toString().split("\u0000");
- tempContext = cQ[0];
-
- if (!hasContext && ctxt == null) {
- ctxt = cQ[0];
- }
-
- contextEqual = ctxt.equals(cQ[0]);
-
- String s1 = sources[sourceID].termCond.toString();
- String s2 = cQ[1] + "\u0000" + cQ[2];
-
- if (sources[sourceID].isPrefix) {
- isPrefix = s2.startsWith(s1 + "\u0000");
- } else {
- isPrefix = s2.startsWith(s1);
- }
-
- termCompare = (contextEqual && isPrefix) ? 0 : (ctxt + "\u0000" + s1).compareTo(cQ[0] + "\u0000" + s2);
-
- // if(currentRow.toString().equals(s)) {
- // log.info("Term compare is " + termCompare);
- // }
-
- // check if this source is already on the right columnFamily
- // if not, then seek forwards to the right columnFamily
- if (termCompare > 0) {
- Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt +
- "\u0000" + sources[sourceID].termCond.toString()));
- sources[sourceID].seek(new Range(seekKey, true, null, false));
-
- continue;
- }
- // check if this source is beyond the right columnFamily
- // if so, then seek to the next row
- if (termCompare < 0) {
- // we're out of entries in the current row, so seek to the
- // next one
-
- if (endCompare == 0) {
- // we're done
- currentRow = null;
- // setting currentRow to null counts as advancing the
- // cursor
- return true;
- }
-
-
-
- //advance to next row if context set - all entries in given row exhausted
- if (hasContext || tempContext.length() == 0) {
- Key seekKey = buildFollowRowKey(sources[sourceID].top, sources[sourceID].term,
- new Text(ctxt + "\u0000" + sources[sourceID].termCond.toString()));
- sources[sourceID].seek(new Range(seekKey, true, null, false));
- } else {
-
- if(contextEqual && !isPrefix) {
- Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt + "\u0001"));
- sources[sourceID].seek(new Range(seekKey, true, null, false));
- if(sources[sourceID].top != null) {
- ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0];
- }
- } else {
- Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(tempContext +
- "\u0000" + sources[sourceID].termCond.toString()));
- sources[sourceID].seek(new Range(seekKey, true, null, false));
- if(sources[sourceID].top != null) {
- ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0];
- }
- }
-
- }
-
-
-// if(currentRow.toString().equals(s)) {
-// log.info("current term cond is " + currentTermCond);
-//
-// }
-
-
- continue;
- }
- }
-
-
-
-
-
-
-
-
-
-
- //set currentTermCond -- gets appended to end of currentKey column qualifier
- //used to determine which term iterator to advance when a new iterator is created
-
- sources[sourceID].currentCQ.set(getTermCond(sources[sourceID].top));
-
- if (sources[sourceID].next != null) {
-
- //is hasContext, only consider sourceID with next having designated context
- //otherwise don't set currentTermCond
- if (!termCondSet && hasContext) {
- if (sources[sourceID].next.getRow().equals(currentRow)
- && sources[sourceID].next.getColumnQualifier().toString()
- .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) {
- currentTermCond.set(new Text(Integer.toString(sourceID)));
- termCondSet = true;
- }
- } else if(!termCondSet){
- String[] cq = getTermCond(sources[sourceID].next).toString().split("\u0000");
-
- //set currentTermCond with preference given to sourceID having next with same context
- //otherwise set currentTermCond sourceID with next having termCond as prefix
- if (sources[sourceID].next.getRow().equals(currentRow)) {
- if (sources[sourceID].next.getColumnQualifier().toString()
- .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) {
- currentTermCond.set(new Text(Integer.toString(sourceID)));
- termCondSet = true;
- } else if ((cq[1] + "\u0000" + cq[2]).startsWith(sources[sourceID].termCond.toString())) {
- currentTermCond.set(new Text(Integer.toString(sourceID)));
- }
- }
- }
- }
-
-
- break;
- }
-
- return advancedCursor;
- }
-
- @Override
- public void next() throws IOException {
- if (currentRow == null) {
- return;
- }
-
-
-
- if(currentTermCond.getLength() != 0) {
-
- int id = Integer.parseInt(currentTermCond.toString());
-
- sources[id].next();
- currentTermCond.set(emptyByteArray);
- termCondSet = false;
- if(sources[id].top != null && !hasContext) {
- ctxt = getTermCond(sources[id].top).toString().split("\u0000")[0];
- }
- advanceToIntersection();
- return;
- }
-
- sources[0].next();
- if(sources[0].top != null && !hasContext) {
- ctxt = getTermCond(sources[0].top).toString().split("\u0000")[0];
- }
- advanceToIntersection();
- }
-
- protected void advanceToIntersection() throws IOException {
- boolean cursorChanged = true;
- while (cursorChanged) {
- // seek all of the sources to at least the highest seen column qualifier in the current row
- cursorChanged = false;
- for (int i = 0; i < sourcesCount; i++) {
-// log.info("New sourceID is " + i);
- if (currentRow == null) {
- topKey = null;
- return;
- }
- if (seekOneSource(i)) {
- currentTermCond.set(emptyByteArray);
- termCondSet = false;
- cursorChanged = true;
- break;
- }
- }
- }
- String cq = "";
- for(int i = 0; i < sourcesCount; i++) {
- cq = cq + sources[i].currentCQ.toString() + DocIndexIteratorUtil.DOC_ID_INDEX_DELIM;
- }
-
- if (currentTermCond.getLength() == 0) {
- topKey = buildKey(currentRow, nullText, new Text(cq + -1));
- } else {
- topKey = buildKey(currentRow, nullText, new Text(cq + currentTermCond.toString()));
- }
- }
-
- public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
- if (iter.hasTop())
- return iter.getTopKey().toString();
- return "";
- }
-
- private static final String columnOptionName = "columns";
- private static final String columnPrefix = "prefixes";
- private static final String context = "context";
-
-
-
- protected static String encodeColumns(TextColumn[] columns) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < columns.length; i++) {
- sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnFamily()))));
- sb.append('\n');
- sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnQualifier()))));
- sb.append('\u0001');
- }
- return sb.toString();
- }
-
-
-
- protected static TextColumn[] decodeColumns(String columns) {
- String[] columnStrings = columns.split("\u0001");
- TextColumn[] columnTexts = new TextColumn[columnStrings.length];
- for (int i = 0; i < columnStrings.length; i++) {
- String[] columnComponents = columnStrings[i].split("\n");
- columnTexts[i] = new TextColumn(new Text(Base64.decodeBase64(columnComponents[0].getBytes())),
- new Text(Base64.decodeBase64(columnComponents[1].getBytes())));
- }
- return columnTexts;
- }
-
-
-
-
-
- /**
- * @param context
- * @return encoded context
- */
- protected static String encodeContext(String context) {
-
- return new String(Base64.encodeBase64(context.getBytes()));
- }
-
-
-
- /**
- * @param context
- * @return decoded context
- */
- protected static String decodeContext(String context) {
-
- if (context == null) {
- return null;
- } else {
- return new String(Base64.decodeBase64(context.getBytes()));
- }
- }
-
-
-
-
-
- protected static String encodeBooleans(boolean[] prefixes) {
- byte[] bytes = new byte[prefixes.length];
- for (int i = 0; i < prefixes.length; i++) {
- if (prefixes[i])
- bytes[i] = 1;
- else
- bytes[i] = 0;
- }
- return new String(Base64.encodeBase64(bytes));
- }
-
- /**
- * @param flags
- * @return decoded flags
- */
- protected static boolean[] decodeBooleans(String prefixes) {
- // return null of there were no flags
- if (prefixes == null)
- return null;
-
- byte[] bytes = Base64.decodeBase64(prefixes.getBytes());
- boolean[] bFlags = new boolean[bytes.length];
- for (int i = 0; i < bytes.length; i++) {
- if (bytes[i] == 1)
- bFlags[i] = true;
- else
- bFlags[i] = false;
- }
- return bFlags;
- }
-
-
-
-
-
-
-
-
- @Override
- public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- TextColumn[] terms = decodeColumns(options.get(columnOptionName));
- boolean[] prefixes = decodeBooleans(options.get(columnPrefix));
- ctxt = decodeContext(options.get(context));
-
- if(ctxt != null) {
- hasContext = true;
- }
-
-
-
- if (terms.length < 2) {
- throw new IllegalArgumentException("IntersectionIterator requires two or more columns families");
- }
-
- sources = new TermSource[terms.length];
- sources[0] = new TermSource(source, terms[0]);
- for (int i = 1; i < terms.length; i++) {
- //log.info("For decoded column " + i + " column family is " + terms[i].getColumnFamily() + " and qualifier is " + terms[i].getColumnQualifier());
- sources[i] = new TermSource(source.deepCopy(env), terms[i]);
- sources[i].isPrefix = prefixes[i];
- }
- sourcesCount = terms.length;
- }
-
- @Override
- public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
- overallRange = new Range(range);
- currentRow = new Text();
- currentTermCond.set(emptyByteArray);
- termCondSet = false;
-
-
-
-// log.info("Calling seek with range " + range);
-
- // seek each of the sources to the right column family within the row
- // given by key
-
- Key sourceKey;
-
- if (rangeCqValid(range)) {
-
- String[] cqInfo = cqParser(range.getStartKey().getColumnQualifier());
- int id = Integer.parseInt(cqInfo[1]);
-
-
-
- if (id >= 0) {
- for (int i = 0; i < sourcesCount; i++) {
-
- if (i == id) {
- sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, new Text(cqInfo[0]));
- sources[i].seek(new Range(sourceKey, true, null, false));
- sources[i].next();
- if(!hasContext && sources[i].hasTop()) {
- ctxt = getTermCond(sources[i].top).toString().split("\u0000")[0];
- }
- } else {
- sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term);
- sources[i].seek(new Range(sourceKey, true, null, false));
- }
- }
- } else {
-
-
- for (int i = 0; i < sourcesCount; i++) {
- sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, range.getStartKey()
- .getColumnQualifier());
- sources[i].seek(new Range(sourceKey, true, null, false));
- }
- }
-
-
- } else {
-
-// log.info("Range is invalid.");
- for (int i = 0; i < sourcesCount; i++) {
-
- if (range.getStartKey() != null) {
-
- sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term);
-
- // Seek only to the term for this source as a column family
- sources[i].seek(new Range(sourceKey, true, null, false));
- } else {
- // Seek only to the term for this source as a column family
-
- sources[i].seek(range);
- }
- }
- }
-
- advanceToIntersection();
-
- }
-
-
- private String[] cqParser(Text cq) {
-
- String cQ = cq.toString();
- String[] cqComponents = cQ.split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM);
- int id = -1;
- String[] valPos = new String[2];
-
-
-
-
- if(cqComponents.length > 1) {
- id = Integer.parseInt(cqComponents[cqComponents.length-1]);
- if (id >= 0) {
- valPos[0] = cqComponents[id].toString();
- valPos[1] = "" + id;
- } else {
- valPos[0] = cqComponents[0].toString();
- valPos[1] = "" + id;
- }
- } else {
- valPos[0] = cq.toString();
- valPos[1] = "" + -1;
- }
-
- return valPos;
-
- }
-
-
- private boolean rangeCqValid(Range range) {
- return (range.getStartKey() != null) && (range.getStartKey().getColumnQualifier() != null);
- }
-
-
-
- public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, TextColumn column) {
- // Check if we have space for the added Source
- if (sources == null) {
- sources = new TermSource[1];
- } else {
- // allocate space for node, and copy current tree.
- // TODO: Should we change this to an ArrayList so that we can just add() ? - ACCUMULO-1309
- TermSource[] localSources = new TermSource[sources.length + 1];
- int currSource = 0;
- for (TermSource myTerm : sources) {
- // TODO: Do I need to call new here? or can I just re-use the term? - ACCUMULO-1309
- localSources[currSource] = new TermSource(myTerm);
- currSource++;
- }
- sources = localSources;
- }
- sources[sourcesCount] = new TermSource(source.deepCopy(env), column);
- sourcesCount++;
- }
-
- /**
- * Encode the columns to be used when iterating.
- *
- * @param cfg
- * @param columns
- */
- public static void setColumnFamilies(IteratorSetting cfg, TextColumn[] columns) {
- if (columns.length < 2)
- throw new IllegalArgumentException("Must supply at least two terms to intersect");
-
- boolean[] prefix = new boolean[columns.length];
-
- for(int i = 0; i < columns.length; i++) {
- prefix[i] = columns[i].isPrefix();
- }
-
-
-
- cfg.addOption(DocumentIndexIntersectingIterator.columnPrefix, DocumentIndexIntersectingIterator.encodeBooleans(prefix));
- cfg.addOption(DocumentIndexIntersectingIterator.columnOptionName, DocumentIndexIntersectingIterator.encodeColumns(columns));
- }
-
-
-
-
-
- public static void setContext(IteratorSetting cfg, String context) {
-
- cfg.addOption(DocumentIndexIntersectingIterator.context, DocumentIndexIntersectingIterator.encodeContext(context));
-
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java
deleted file mode 100644
index 661f62b..0000000
--- a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package mvm.rya.accumulo.documentIndex;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import org.apache.hadoop.io.Text;
-
-public class TextColumn {
-
-
- private Text columnFamily;
- private Text columnQualifier;
- private boolean isPrefix = false;
-
-
-
- public TextColumn(Text columnFamily, Text columnQualifier) {
- this.columnFamily = columnFamily;
- this.columnQualifier = columnQualifier;
- }
-
-
- public TextColumn(TextColumn other) {
-
- this.columnFamily = new Text(other.columnFamily);
- this.columnQualifier = new Text(other.columnQualifier);
- this.isPrefix = other.isPrefix;
-
- }
-
-
- public Text getColumnFamily() {
- return columnFamily;
- }
-
-
- public boolean isPrefix() {
- return isPrefix;
- }
-
-
- public void setIsPrefix(boolean isPrefix) {
- this.isPrefix = isPrefix;
- }
-
-
- public boolean isValid() {
- return (columnFamily != null && columnQualifier != null);
- }
-
-
-
- public Text getColumnQualifier() {
- return columnQualifier;
- }
-
-
- public void setColumnFamily(Text cf) {
- this.columnFamily = cf;
- }
-
- public void setColumnQualifier(Text cq) {
- this.columnQualifier = cq;
- }
-
- public String toString() {
-
- return columnFamily.toString() + ", " + columnQualifier.toString() + ", prefix:" + isPrefix;
- }
-
- @Override
- public boolean equals(Object other) {
-
- if(other == null) {
- return false;
- }
-
- if(!(other instanceof TextColumn)) {
- return false;
- }
-
- TextColumn tc = (TextColumn) other;
-
- return this.columnFamily.equals(tc.columnFamily) && this.columnQualifier.equals(tc.columnQualifier) && this.isPrefix == tc.isPrefix;
-
-
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java
deleted file mode 100644
index 0966903..0000000
--- a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java
+++ /dev/null
@@ -1,324 +0,0 @@
-package mvm.rya.accumulo.pcj.iterators;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
-
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-/**
- * This {@link CloseableIteration} performs a hash join by joining each
- * {@link Map.Entry<String, BindingSet>} with all corresponding
- * {@link BindingSet} in a Multimap with the same String key.
- *
- */
-public class BindingSetHashJoinIterator implements
- CloseableIteration<BindingSet, QueryEvaluationException> {
-
- //BindingSets passed to PCJ mapped according to values
- //associated with common variables with table
- private Multimap<String, BindingSet> bindingJoinVarHash;
- //BindingSets taken from PCJ table
- private CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter;
- private Iterator<BindingSet> joinedBindingSets = Collections
- .emptyIterator();
- //If PCJ contains LeftJoin, this is a set of variable in LeftJoin. Used when performing Join.
- private Set<String> unAssuredVariables;
- //indicates when HashJoin formed from a single collection of join variable or if the size and
- //collection of join variables varies -- this is to optimize the join process
- private HashJoinType type;
- private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet();
- private BindingSet next;
- private boolean hasNextCalled = false;
- private boolean isEmpty = false;
-
- /**
- * Enum type to indicate whether HashJoin will be performed over a fixed
- * subset of variables common to each {@link BindingSet}, or if there is a
- * collection of variable subsets over which to join.
- *
- */
- public enum HashJoinType {
- CONSTANT_JOIN_VAR, VARIABLE_JOIN_VAR
- };
-
- public BindingSetHashJoinIterator(
- Multimap<String, BindingSet> bindingJoinVarHash,
- CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter,
- Set<String> unAssuredVariables, HashJoinType type) {
- this.bindingJoinVarHash = bindingJoinVarHash;
- this.joinIter = joinIter;
- this.type = type;
- this.unAssuredVariables = unAssuredVariables;
- }
-
- @Override
- public boolean hasNext() throws QueryEvaluationException {
- if (!hasNextCalled && !isEmpty) {
- while (joinedBindingSets.hasNext() || joinIter.hasNext()) {
- if (!joinedBindingSets.hasNext()) {
- Entry<String, BindingSet> entry = joinIter.next();
- joinedBindingSets = joinBindingSetEntry(entry);
- }
- if (!joinedBindingSets.hasNext()) {
- continue;
- }
- next = joinedBindingSets.next();
- hasNextCalled = true;
- return true;
- }
-
- isEmpty = true;
- return false;
- } else if (isEmpty) {
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public BindingSet next() throws QueryEvaluationException {
- if (hasNextCalled) {
- hasNextCalled = false;
- } else if (isEmpty) {
- throw new NoSuchElementException();
- } else {
- if (this.hasNext()) {
- hasNextCalled = false;
- } else {
- throw new NoSuchElementException();
- }
- }
- return next;
- }
-
- @Override
- public void remove() throws QueryEvaluationException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws QueryEvaluationException {
- joinIter.close();
- }
-
- /**
- * This method takes the valOrderString, which is a key used for computing
- * hash joins, and generates multiple keys by pulling off one delimiter
- * separated component at a time. This is used when the size of the join key
- * varies from {@link Map.Entry} to Entry. It allows the BindingSet to be
- * joined using all prefixes of the key.
- *
- * @param valOrderString
- * - key used for hash join
- * @return
- */
- private List<String> getValueOrders(String valOrderString) {
-
- List<String> valueOrders = new ArrayList<>();
- String[] splitValOrderString = valOrderString
- .split(ExternalTupleSet.VALUE_DELIM);
- StringBuffer buffer = new StringBuffer();
- buffer.append(splitValOrderString[0]);
- valueOrders.add(buffer.substring(0));
-
- for (int i = 1; i < splitValOrderString.length; i++) {
- buffer.append(ExternalTupleSet.VALUE_DELIM + splitValOrderString[i]);
- valueOrders.add(buffer.substring(0));
- }
-
- return valueOrders;
- }
-
- /**
- * This method verifies that all common variables have a common value and
- * then joins the BindingSets together. In the case that the PCJ contains a
- * LeftJoin, if the leftBs and rightBs have a common variable with distinct
- * values and that common variable is unassured (only appears in LeftJoin),
- * this method uses the value corresponding to leftBs.
- *
- * @param leftBs
- * - BindingSet passed into PCJ
- * @param rightBs
- * - PCJ BindingSet
- * @return - joined BindingSet
- */
- private BindingSet joinBindingSets(BindingSet leftBs, BindingSet rightBs) {
-
- Set<String> commonVars = Sets.intersection(leftBs.getBindingNames(),
- rightBs.getBindingNames());
- // compare values associated with common variables to make sure
- // BindingSets can be joined. Possible for leftBs and rightBs
- // to have a common unAssuredVariable in event PCJ contains LeftJoin.
- // if values corresponding to common unAssuredVariable do not agree
- // add value corresponding to leftBs
- for (String s : commonVars) {
- if (!leftBs.getValue(s).equals(rightBs.getValue(s))
- && !unAssuredVariables.contains(s)) {
- return EMPTY_BINDINGSET;
- }
- }
- QueryBindingSet bs = new QueryBindingSet(removeConstants(leftBs));
-
- rightBs = removeConstants(rightBs);
- // only add Bindings corresponding to variables that have no value
- // assigned. This takes into account case where leftBs and rightBs
- // share a common, unAssuredVariable. In this case, use value
- // corresponding
- // to leftBs, which is effectively performing a LeftJoin.
- for (String s : rightBs.getBindingNames()) {
- if (bs.getValue(s) == null) {
- bs.addBinding(s, rightBs.getValue(s));
- }
- }
-
- return bs;
- }
-
- private BindingSet removeConstants(BindingSet bs) {
- QueryBindingSet bSet = new QueryBindingSet();
- for (String s : bs.getBindingNames()) {
- if (!s.startsWith(ExternalTupleSet.CONST_PREFIX)) {
- bSet.addBinding(bs.getBinding(s));
- }
- }
- return bSet;
- }
-
- /**
- * This method returns an Iterator which joins the given Entry's BindingSet
- * to all BindingSets which matching the Entry's key.
- *
- * @param entry - entry to be joined
- * @return - Iterator over joined BindingSets
- */
- private Iterator<BindingSet> joinBindingSetEntry(
- Map.Entry<String, BindingSet> entry) {
-
- List<Collection<BindingSet>> matches = new ArrayList<>();
- if (type == HashJoinType.CONSTANT_JOIN_VAR) {
- if (bindingJoinVarHash.containsKey(entry.getKey())) {
- matches.add(bindingJoinVarHash.get(entry.getKey()));
- }
- } else {
- List<String> valOrders = getValueOrders(entry.getKey());
- for (String s : valOrders) {
- if (bindingJoinVarHash.containsKey(s)) {
- matches.add(bindingJoinVarHash.get(s));
- }
- }
- }
-
- if (matches.size() == 0) {
- return Collections.emptyIterator();
- } else {
- return new BindingSetCollectionsJoinIterator(entry.getValue(),
- matches);
- }
-
- }
-
- /**
- * Given a BindingSet and a List of Collections of BindingSets, this
- * Iterator joins the BindingSet with the BindingSets in each Collection
- *
- */
- private class BindingSetCollectionsJoinIterator implements
- Iterator<BindingSet> {
-
- private Iterator<Collection<BindingSet>> collectionIter;
- private Iterator<BindingSet> bsIter = Collections.emptyIterator();
- private BindingSet next;
- private BindingSet joinBs;
- private boolean hasNextCalled = false;
- private boolean isEmpty = false;
-
- public BindingSetCollectionsJoinIterator(BindingSet bs,
- List<Collection<BindingSet>> collection) {
- this.collectionIter = collection.iterator();
- this.joinBs = bs;
- }
-
- @Override
- public boolean hasNext() {
-
- if (!hasNextCalled && !isEmpty) {
- while (bsIter.hasNext() || collectionIter.hasNext()) {
- if (!bsIter.hasNext()) {
- bsIter = collectionIter.next().iterator();
- }
- next = joinBindingSets(bsIter.next(), joinBs);
- if (next == EMPTY_BINDINGSET) {
- continue;
- }
- hasNextCalled = true;
- return true;
- }
- isEmpty = true;
- return false;
- } else if (isEmpty) {
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public BindingSet next() {
- if (hasNextCalled) {
- hasNextCalled = false;
- } else if (isEmpty) {
- throw new NoSuchElementException();
- } else {
- if (this.hasNext()) {
- hasNextCalled = false;
- } else {
- throw new NoSuchElementException();
- }
- }
- return next;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java
deleted file mode 100644
index 2407865..0000000
--- a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package mvm.rya.accumulo.pcj.iterators;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.QueryEvaluationException;
-
-import com.google.common.base.Preconditions;
-
-/**
- * This {@link CloseableIteration} takes in a list of CloseableIterations
- * and merges them together into a single CloseableIteration.
- *
- */
-public class IteratorCombiner implements
- CloseableIteration<BindingSet, QueryEvaluationException> {
-
-
- private Collection<CloseableIteration<BindingSet, QueryEvaluationException>> iterators;
- private Iterator<CloseableIteration<BindingSet, QueryEvaluationException>> iteratorIterator;
- private CloseableIteration<BindingSet, QueryEvaluationException> currIter;
- private boolean isEmpty = false;
- private boolean hasNextCalled = false;
- private BindingSet next;
-
- public IteratorCombiner(Collection<CloseableIteration<BindingSet, QueryEvaluationException>> iterators) {
- Preconditions.checkArgument(iterators.size() > 0);
- this.iterators = iterators;
- iteratorIterator = iterators.iterator();
- currIter = iteratorIterator.next();
- }
-
- @Override
- public boolean hasNext() throws QueryEvaluationException {
- if (!hasNextCalled && !isEmpty) {
- while (currIter.hasNext() || iteratorIterator.hasNext()) {
- if(!currIter.hasNext()) {
- currIter = iteratorIterator.next();
- }
- if(!currIter.hasNext()) {
- continue;
- }
- next = currIter.next();
- hasNextCalled = true;
- return true;
- }
- isEmpty = true;
- return false;
- } else if (isEmpty) {
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public BindingSet next() throws QueryEvaluationException {
- if (hasNextCalled) {
- hasNextCalled = false;
- } else if (isEmpty) {
- throw new NoSuchElementException();
- } else {
- if (this.hasNext()) {
- hasNextCalled = false;
- } else {
- throw new NoSuchElementException();
- }
- }
- return next;
- }
-
- @Override
- public void remove() throws QueryEvaluationException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws QueryEvaluationException {
- for(CloseableIteration<BindingSet, QueryEvaluationException> iterator: iterators) {
- iterator.close();
- }
- }
-
-}