You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by mi...@apache.org on 2015/12/22 17:50:17 UTC
[53/56] [abbrv] incubator-rya git commit: RYA-12 Adding support for
Additional Iterators on Core Tables
RYA-12 Adding support for Additional Iterators on Core Tables
note: the config file format may change in the future. This is really
just a change to the client Config API and Query Engine.
Also pulling in an orphan "Manual Flush" commit that did not make it
into the repo.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e6be84a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e6be84a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e6be84a4
Branch: refs/heads/master
Commit: e6be84a407e05c66cb4b4b6ef225d7e07dd10fcf
Parents: 990f1ff
Author: Aaron Mihalik <mi...@alum.mit.edu>
Authored: Fri Dec 4 20:25:51 2015 -0500
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Fri Dec 4 20:25:51 2015 -0500
----------------------------------------------------------------------
.../rya/accumulo/AccumuloRdfConfiguration.java | 74 +++++++++++++++++++-
.../java/mvm/rya/accumulo/AccumuloRyaDAO.java | 33 ++++-----
.../accumulo/query/AccumuloRyaQueryEngine.java | 6 ++
.../accumulo/AccumuloRdfConfigurationTest.java | 34 ++++++---
.../mvm/rya/accumulo/AccumuloRyaDAOTest.java | 58 +++++++++++++--
.../accumulo/entity/AccumuloDocIndexerTest.java | 3 +-
6 files changed, 173 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java
index 147228b..709ceb9 100644
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java
@@ -21,11 +21,17 @@ package mvm.rya.accumulo;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import mvm.rya.accumulo.experimental.AccumuloIndexer;
import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +48,17 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
public static final String MAXRANGES_SCANNER = "ac.query.maxranges";
public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers";
+
+ public static final String CONF_FLUSH_EACH_UPDATE = "ac.dao.flush";
+
+ public static final String ITERATOR_SETTINGS_SIZE = "ac.iterators.size";
+ public static final String ITERATOR_SETTINGS_BASE = "ac.iterators.%d.";
+ public static final String ITERATOR_SETTINGS_NAME = ITERATOR_SETTINGS_BASE + "name";
+ public static final String ITERATOR_SETTINGS_CLASS = ITERATOR_SETTINGS_BASE + "iteratorClass";
+ public static final String ITERATOR_SETTINGS_PRIORITY = ITERATOR_SETTINGS_BASE + "priority";
+ public static final String ITERATOR_SETTINGS_OPTIONS_SIZE = ITERATOR_SETTINGS_BASE + "optionsSize";
+ public static final String ITERATOR_SETTINGS_OPTIONS_KEY = ITERATOR_SETTINGS_BASE + "option.%d.name";
+ public static final String ITERATOR_SETTINGS_OPTIONS_VALUE = ITERATOR_SETTINGS_BASE + "option.%d.value";
public AccumuloRdfConfiguration() {
super();
@@ -73,7 +90,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
public void setAdditionalIndexers(Class<? extends AccumuloIndexer>... indexers) {
List<String> strs = Lists.newArrayList();
- for (Class ai : indexers){
+ for (Class<? extends AccumuloIndexer> ai : indexers){
strs.add(ai.getName());
}
@@ -83,4 +100,59 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
public List<AccumuloIndexer> getAdditionalIndexers() {
return getInstances(CONF_ADDITIONAL_INDEXERS, AccumuloIndexer.class);
}
+ public boolean flushEachUpdate(){
+ return getBoolean(CONF_FLUSH_EACH_UPDATE, true);
+ }
+
+ public void setFlush(boolean flush){
+ setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
+ }
+
+ public void setAdditionalIterators(IteratorSetting... additionalIterators){
+ //TODO do we need to worry about cleaning up
+ this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length));
+ int i = 0;
+ for(IteratorSetting iterator : additionalIterators) {
+ this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName());
+ this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass());
+ this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority()));
+ Map<String, String> options = iterator.getOptions();
+
+ this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size()));
+ Iterator<Entry<String, String>> it = options.entrySet().iterator();
+ int j = 0;
+ while(it.hasNext()) {
+ Entry<String, String> item = it.next();
+ this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey());
+ this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue());
+ j++;
+ }
+ i++;
+ }
+ }
+
+ public IteratorSetting[] getAdditionalIterators(){
+ int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0"));
+ if(size == 0) {
+ return new IteratorSetting[0];
+ }
+
+ IteratorSetting[] settings = new IteratorSetting[size];
+ for(int i = 0; i < size; i++) {
+ String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i));
+ String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i));
+ int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i)));
+
+ int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i)));
+ Map<String, String> options = new HashMap<String, String>(optionsSize);
+ for(int j = 0; j < optionsSize; j++) {
+ String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j));
+ String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j));
+ options.put(key, value);
+ }
+ settings[i] = new IteratorSetting(priority, name, iteratorClass, options);
+ }
+
+ return settings;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
index 84fae68..8a6bd00 100644
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
@@ -79,15 +79,11 @@ import mvm.rya.api.resolver.RyaTripleContext;
import mvm.rya.api.resolver.triple.TripleRow;
import mvm.rya.api.resolver.triple.TripleRowResolverException;
-/**
- * Class AccumuloRyaDAO
- * Date: Feb 29, 2012
- * Time: 12:37:22 PM
- */
public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> {
private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
private boolean initialized = false;
+ private boolean flushEachUpdate = true;
private Connector connector;
private BatchWriterConfig batchWriterConfig;
@@ -134,6 +130,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
secondaryIndexers = conf.getAdditionalIndexers();
+ flushEachUpdate = conf.flushEachUpdate();
+
TableOperations tableOperations = connector.tableOperations();
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo());
AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo());
@@ -151,9 +149,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo());
bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp());
- bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), MAX_MEMORY,
- MAX_TIME, 1);
-
+ bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
+
for (AccumuloIndexer index : secondaryIndexers) {
index.setMultiTableBatchWriter(mt_bw);
}
@@ -193,7 +190,6 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
@Override
public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException {
this.delete(Iterators.singletonIterator(stmt), aconf);
- //TODO currently all indexers do not support delete
}
@Override
@@ -211,8 +207,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
index.deleteStatement(stmt);
}
}
- mt_bw.flush();
- //TODO currently all indexers do not support delete
+ if (flushEachUpdate) { mt_bw.flush(); }
} catch (Exception e) {
throw new RyaDAOException(e);
}
@@ -299,7 +294,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
}
}
- mt_bw.flush();
+ if (flushEachUpdate) { mt_bw.flush(); }
} catch (Exception e) {
throw new RyaDAOException(e);
}
@@ -314,10 +309,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
try {
initialized = false;
mt_bw.flush();
- bw_ns.flush();
mt_bw.close();
- bw_ns.close();
} catch (Exception e) {
throw new RyaDAOException(e);
}
@@ -329,7 +322,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
Mutation m = new Mutation(new Text(pfx));
m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes()));
bw_ns.addMutation(m);
- bw_ns.flush();
+ if (flushEachUpdate) { mt_bw.flush(); }
} catch (Exception e) {
throw new RyaDAOException(e);
}
@@ -360,7 +353,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
Mutation del = new Mutation(new Text(pfx));
del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
bw_ns.addMutation(del);
- bw_ns.flush();
+ if (flushEachUpdate) { mt_bw.flush(); }
} catch (Exception e) {
throw new RyaDAOException(e);
}
@@ -464,6 +457,14 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
this.queryEngine = queryEngine;
}
+ public void flush() throws RyaDAOException {
+ try {
+ mt_bw.flush();
+ } catch (MutationsRejectedException e) {
+ throw new RyaDAOException(e);
+ }
+ }
+
protected String[] getTables() {
// core tables
List<String> tableNames = Lists.newArrayList(
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
index 1d0d9c9..869a128 100644
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
@@ -388,6 +388,12 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu
RegExFilter.setRegexs(setting, regex, null, null, null, false);
scanner.addScanIterator(setting);
}
+ if (conf instanceof AccumuloRdfConfiguration) {
+ //TODO should we take the iterator settings as is or should we adjust the priority based on the above?
+ for (IteratorSetting itr : ((AccumuloRdfConfiguration)conf).getAdditionalIterators()) {
+ scanner.addScanIterator(itr);
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java
index b7c9079..ffd316e 100644
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java
+++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java
@@ -21,20 +21,19 @@ package mvm.rya.accumulo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.security.Authorizations;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Date: 1/28/13
- * Time: 8:36 AM
- */
public class AccumuloRdfConfigurationTest {
private static final Logger logger = LoggerFactory.getLogger(AccumuloRdfConfigurationTest.class);
@@ -56,4 +55,21 @@ public class AccumuloRdfConfigurationTest {
assertEquals(str, conf.getAuth());
assertEquals(auths, conf.getAuthorizations());
}
+
+ @Test
+ public void testIterators() {
+ AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+
+ Map<String, String> options = new HashMap<String, String>();
+ options.put("key1", "value1");
+ options.put("key2", "value2");
+ IteratorSetting setting = new IteratorSetting(1, "test", "test2", options);
+
+ conf.setAdditionalIterators(setting);
+ IteratorSetting[] iteratorSettings = conf.getAdditionalIterators();
+ assertTrue(iteratorSettings.length == 1);
+
+ assertEquals(setting, iteratorSettings[0]);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java
index ab4528b..5c30e67 100644
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java
+++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java
@@ -21,9 +21,18 @@ package mvm.rya.accumulo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import info.aduna.iteration.CloseableIteration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
-import mvm.rya.api.RdfCloudTripleStoreUtils;
import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.domain.RyaType;
import mvm.rya.api.domain.RyaURI;
@@ -33,10 +42,11 @@ import mvm.rya.api.resolver.RdfToRyaConversions;
import mvm.rya.api.resolver.RyaContext;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.mock.MockInstance;
-import org.calrissian.mango.collect.CloseableIterable;
+import org.apache.accumulo.core.iterators.FirstEntryInRowIterator;
import org.calrissian.mango.collect.FluentCloseableIterable;
import org.junit.After;
import org.junit.Before;
@@ -44,11 +54,6 @@ import org.junit.Test;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.BindingSet;
-
-import java.util.*;
-
-import static org.junit.Assert.*;
/**
* Class AccumuloRdfDAOTest
@@ -631,6 +636,45 @@ public class AccumuloRyaDAOTest {
assertFalse(dao.isInitialized());
}
+ @Test
+ public void testQueryWithIterators() throws Exception {
+ RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+ RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+ RyaURI uri1 = new RyaURI(litdupsNS + "uri1");
+ dao.add(new RyaStatement(cpu, loadPerc, uri1, null, "qual1"));
+ dao.add(new RyaStatement(cpu, loadPerc, uri1, null, "qual2"));
+
+ AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine();
+
+ AccumuloRdfConfiguration queryConf = new AccumuloRdfConfiguration(conf);
+ IteratorSetting firstEntryInRow = new IteratorSetting(3 /* correct value?? */, FirstEntryInRowIterator.class);
+ queryConf.setAdditionalIterators(firstEntryInRow);
+
+ Collection<RyaStatement> coll = new ArrayList<>();
+ coll.add(new RyaStatement(null, loadPerc, uri1));
+ CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.batchQuery(coll, queryConf);
+ int count = 0;
+ while (iter.hasNext()) {
+ count++;
+ iter.next();
+ }
+ iter.close();
+ assertEquals(1, count);
+
+ //Assert that without the iterator we get 2
+ coll = new ArrayList<>();
+ coll.add(new RyaStatement(null, loadPerc, uri1));
+ iter = queryEngine.batchQuery(coll, conf);
+ count = 0;
+ while (iter.hasNext()) {
+ count++;
+ iter.next();
+ }
+ iter.close();
+ assertEquals(2, count);
+
+ }
+
private boolean areTablesEmpty() throws TableNotFoundException {
for (String table : dao.getTables()) {
if (tableExists(table)) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java
index e7e06d9..6237697 100644
--- a/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java
+++ b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java
@@ -26,14 +26,12 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import junit.framework.Assert;
import mvm.rya.accumulo.AccumuloRdfConfiguration;
import mvm.rya.accumulo.RyaTableMutationsFactory;
import mvm.rya.api.RdfCloudTripleStoreConstants;
import mvm.rya.api.domain.RyaStatement;
import mvm.rya.api.domain.RyaType;
import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.layout.TablePrefixLayoutStrategy;
import mvm.rya.api.resolver.RyaToRdfConversions;
import mvm.rya.api.resolver.RyaTripleContext;
import mvm.rya.indexing.accumulo.ConfigUtils;
@@ -43,6 +41,7 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.data.Mutation;
import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.openrdf.model.Value;