You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by mi...@apache.org on 2015/12/22 17:50:17 UTC

[53/56] [abbrv] incubator-rya git commit: RYA-12 Adding support for Additional Iterators on Core Tables

RYA-12 Adding support for Additional Iterators on Core Tables

note: the config file format may change in the future.  This is really
just a change to the client Config API and Query Engine.

Also pulling in an orphan "Manual Flush" commit that did not make it
into the repo.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e6be84a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e6be84a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e6be84a4

Branch: refs/heads/master
Commit: e6be84a407e05c66cb4b4b6ef225d7e07dd10fcf
Parents: 990f1ff
Author: Aaron Mihalik <mi...@alum.mit.edu>
Authored: Fri Dec 4 20:25:51 2015 -0500
Committer: Aaron Mihalik <mi...@alum.mit.edu>
Committed: Fri Dec 4 20:25:51 2015 -0500

----------------------------------------------------------------------
 .../rya/accumulo/AccumuloRdfConfiguration.java  | 74 +++++++++++++++++++-
 .../java/mvm/rya/accumulo/AccumuloRyaDAO.java   | 33 ++++-----
 .../accumulo/query/AccumuloRyaQueryEngine.java  |  6 ++
 .../accumulo/AccumuloRdfConfigurationTest.java  | 34 ++++++---
 .../mvm/rya/accumulo/AccumuloRyaDAOTest.java    | 58 +++++++++++++--
 .../accumulo/entity/AccumuloDocIndexerTest.java |  3 +-
 6 files changed, 173 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java
index 147228b..709ceb9 100644
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java
@@ -21,11 +21,17 @@ package mvm.rya.accumulo;
 
 
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import mvm.rya.accumulo.experimental.AccumuloIndexer;
 import mvm.rya.api.RdfCloudTripleStoreConfiguration;
 
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.conf.Configuration;
 
@@ -42,6 +48,17 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public static final String MAXRANGES_SCANNER = "ac.query.maxranges";
     
     public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers";
+    
+    public static final String CONF_FLUSH_EACH_UPDATE = "ac.dao.flush";
+
+    public static final String ITERATOR_SETTINGS_SIZE = "ac.iterators.size";
+    public static final String ITERATOR_SETTINGS_BASE = "ac.iterators.%d.";
+    public static final String ITERATOR_SETTINGS_NAME = ITERATOR_SETTINGS_BASE + "name";
+    public static final String ITERATOR_SETTINGS_CLASS = ITERATOR_SETTINGS_BASE + "iteratorClass";
+    public static final String ITERATOR_SETTINGS_PRIORITY = ITERATOR_SETTINGS_BASE + "priority";
+    public static final String ITERATOR_SETTINGS_OPTIONS_SIZE = ITERATOR_SETTINGS_BASE + "optionsSize";
+    public static final String ITERATOR_SETTINGS_OPTIONS_KEY = ITERATOR_SETTINGS_BASE + "option.%d.name";
+    public static final String ITERATOR_SETTINGS_OPTIONS_VALUE = ITERATOR_SETTINGS_BASE + "option.%d.value";
 
     public AccumuloRdfConfiguration() {
         super();
@@ -73,7 +90,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
 
     public void setAdditionalIndexers(Class<? extends AccumuloIndexer>... indexers) {
         List<String> strs = Lists.newArrayList();
-        for (Class ai : indexers){
+        for (Class<? extends AccumuloIndexer> ai : indexers){
             strs.add(ai.getName());
         }
         
@@ -83,4 +100,59 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
     public List<AccumuloIndexer> getAdditionalIndexers() {
         return getInstances(CONF_ADDITIONAL_INDEXERS, AccumuloIndexer.class);
     }
+    public boolean flushEachUpdate(){
+        return getBoolean(CONF_FLUSH_EACH_UPDATE, true);
+    }
+
+    public void setFlush(boolean flush){
+        setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
+    }
+
+    public void setAdditionalIterators(IteratorSetting... additionalIterators){
+        //TODO do we need to worry about cleaning up
+        this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length));
+        int i = 0;
+        for(IteratorSetting iterator : additionalIterators) {
+            this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName());
+            this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass());
+            this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority()));
+            Map<String, String> options = iterator.getOptions();
+
+            this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size()));
+            Iterator<Entry<String, String>> it = options.entrySet().iterator();
+            int j = 0;
+            while(it.hasNext()) {
+                Entry<String, String> item = it.next();
+                this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey());
+                this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue());
+                j++;
+            }
+            i++;
+        }
+    }
+
+    public IteratorSetting[] getAdditionalIterators(){
+        int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0"));
+        if(size == 0) {
+            return new IteratorSetting[0];
+        }
+
+        IteratorSetting[] settings = new IteratorSetting[size];
+        for(int i = 0; i < size; i++) {
+            String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i));
+            String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i));
+            int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i)));
+
+            int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i)));
+            Map<String, String> options = new HashMap<String, String>(optionsSize);
+            for(int j = 0; j < optionsSize; j++) {
+                String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j));
+                String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j));
+                options.put(key, value);
+            }
+            settings[i] = new IteratorSetting(priority, name, iteratorClass, options);
+        }
+
+        return settings;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
index 84fae68..8a6bd00 100644
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java
@@ -79,15 +79,11 @@ import mvm.rya.api.resolver.RyaTripleContext;
 import mvm.rya.api.resolver.triple.TripleRow;
 import mvm.rya.api.resolver.triple.TripleRowResolverException;
 
-/**
- * Class AccumuloRyaDAO
- * Date: Feb 29, 2012
- * Time: 12:37:22 PM
- */
 public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> {
     private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
 
     private boolean initialized = false;
+    private boolean flushEachUpdate = true;
     private Connector connector;
     private BatchWriterConfig batchWriterConfig;
 
@@ -134,6 +130,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
 
             secondaryIndexers = conf.getAdditionalIndexers();
 
+            flushEachUpdate = conf.flushEachUpdate();
+            
             TableOperations tableOperations = connector.tableOperations();
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo());
             AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo());
@@ -151,9 +149,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo());
             bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp());
 
-            bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), MAX_MEMORY,
-                    MAX_TIME, 1);
-
+            bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
+            
             for (AccumuloIndexer index : secondaryIndexers) {
                 index.setMultiTableBatchWriter(mt_bw);
             }
@@ -193,7 +190,6 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     @Override
     public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException {
         this.delete(Iterators.singletonIterator(stmt), aconf);
-        //TODO currently all indexers do not support delete
     }
 
     @Override
@@ -211,8 +207,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
                     index.deleteStatement(stmt);
                 }
             }
-            mt_bw.flush();
-            //TODO currently all indexers do not support delete
+            if (flushEachUpdate) { mt_bw.flush(); }
         } catch (Exception e) {
             throw new RyaDAOException(e);
         }
@@ -299,7 +294,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
                 }
             }
 
-            mt_bw.flush();
+            if (flushEachUpdate) { mt_bw.flush(); }
         } catch (Exception e) {
             throw new RyaDAOException(e);
         }
@@ -314,10 +309,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         try {
             initialized = false;
             mt_bw.flush();
-            bw_ns.flush();
 
             mt_bw.close();
-            bw_ns.close();
         } catch (Exception e) {
             throw new RyaDAOException(e);
         }
@@ -329,7 +322,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             Mutation m = new Mutation(new Text(pfx));
             m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes()));
             bw_ns.addMutation(m);
-            bw_ns.flush();
+            if (flushEachUpdate) { mt_bw.flush(); }
         } catch (Exception e) {
             throw new RyaDAOException(e);
         }
@@ -360,7 +353,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
             Mutation del = new Mutation(new Text(pfx));
             del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
             bw_ns.addMutation(del);
-            bw_ns.flush();
+            if (flushEachUpdate) { mt_bw.flush(); }
         } catch (Exception e) {
             throw new RyaDAOException(e);
         }
@@ -464,6 +457,14 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
         this.queryEngine = queryEngine;
     }
 
+    public void flush() throws RyaDAOException {
+        try {
+            mt_bw.flush();
+        } catch (MutationsRejectedException e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
     protected String[] getTables() {
         // core tables
         List<String> tableNames = Lists.newArrayList(

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
index 1d0d9c9..869a128 100644
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
+++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
@@ -388,6 +388,12 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu
             RegExFilter.setRegexs(setting, regex, null, null, null, false);
             scanner.addScanIterator(setting);
         }
+        if (conf instanceof AccumuloRdfConfiguration) {
+            //TODO should we take the iterator settings as is or should we adjust the priority based on the above?
+            for (IteratorSetting itr : ((AccumuloRdfConfiguration)conf).getAdditionalIterators()) {
+                scanner.addScanIterator(itr);
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java
index b7c9079..ffd316e 100644
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java
+++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java
@@ -21,20 +21,19 @@ package mvm.rya.accumulo;
 
 
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.security.Authorizations;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Date: 1/28/13
- * Time: 8:36 AM
- */
 public class AccumuloRdfConfigurationTest {
     private static final Logger logger = LoggerFactory.getLogger(AccumuloRdfConfigurationTest.class);
 
@@ -56,4 +55,21 @@ public class AccumuloRdfConfigurationTest {
         assertEquals(str, conf.getAuth());
         assertEquals(auths, conf.getAuthorizations());
     }
+
+    @Test
+    public void testIterators() {
+        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+
+        Map<String, String> options = new HashMap<String, String>();
+        options.put("key1", "value1");
+        options.put("key2", "value2");
+        IteratorSetting setting = new IteratorSetting(1, "test", "test2", options);
+
+        conf.setAdditionalIterators(setting);
+        IteratorSetting[] iteratorSettings = conf.getAdditionalIterators();
+        assertTrue(iteratorSettings.length == 1);
+
+        assertEquals(setting, iteratorSettings[0]);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java
index ab4528b..5c30e67 100644
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java
+++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java
@@ -21,9 +21,18 @@ package mvm.rya.accumulo;
 
 
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import info.aduna.iteration.CloseableIteration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
 import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
-import mvm.rya.api.RdfCloudTripleStoreUtils;
 import mvm.rya.api.domain.RyaStatement;
 import mvm.rya.api.domain.RyaType;
 import mvm.rya.api.domain.RyaURI;
@@ -33,10 +42,11 @@ import mvm.rya.api.resolver.RdfToRyaConversions;
 import mvm.rya.api.resolver.RyaContext;
 
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.mock.MockInstance;
-import org.calrissian.mango.collect.CloseableIterable;
+import org.apache.accumulo.core.iterators.FirstEntryInRowIterator;
 import org.calrissian.mango.collect.FluentCloseableIterable;
 import org.junit.After;
 import org.junit.Before;
@@ -44,11 +54,6 @@ import org.junit.Test;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.BindingSet;
-
-import java.util.*;
-
-import static org.junit.Assert.*;
 
 /**
  * Class AccumuloRdfDAOTest
@@ -631,6 +636,45 @@ public class AccumuloRyaDAOTest {
         assertFalse(dao.isInitialized());
     }
 
+    @Test
+    public void testQueryWithIterators() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI uri1 = new RyaURI(litdupsNS + "uri1");
+        dao.add(new RyaStatement(cpu, loadPerc, uri1, null, "qual1"));
+        dao.add(new RyaStatement(cpu, loadPerc, uri1, null, "qual2"));
+
+        AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine();
+
+        AccumuloRdfConfiguration queryConf = new AccumuloRdfConfiguration(conf);
+        IteratorSetting firstEntryInRow = new IteratorSetting(3 /* correct value?? */, FirstEntryInRowIterator.class);
+        queryConf.setAdditionalIterators(firstEntryInRow);
+
+        Collection<RyaStatement> coll = new ArrayList<>();
+        coll.add(new RyaStatement(null, loadPerc, uri1));
+        CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.batchQuery(coll, queryConf);
+        int count = 0;
+        while (iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        iter.close();
+        assertEquals(1, count);
+
+        //Assert that without the iterator we get 2
+        coll = new ArrayList<>();
+        coll.add(new RyaStatement(null, loadPerc, uri1));
+        iter = queryEngine.batchQuery(coll, conf);
+        count = 0;
+        while (iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        iter.close();
+        assertEquals(2, count);
+
+    }
+
     private boolean areTablesEmpty() throws TableNotFoundException {
         for (String table : dao.getTables()) {
             if (tableExists(table)) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java
index e7e06d9..6237697 100644
--- a/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java
+++ b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java
@@ -26,14 +26,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import junit.framework.Assert;
 import mvm.rya.accumulo.AccumuloRdfConfiguration;
 import mvm.rya.accumulo.RyaTableMutationsFactory;
 import mvm.rya.api.RdfCloudTripleStoreConstants;
 import mvm.rya.api.domain.RyaStatement;
 import mvm.rya.api.domain.RyaType;
 import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.layout.TablePrefixLayoutStrategy;
 import mvm.rya.api.resolver.RyaToRdfConversions;
 import mvm.rya.api.resolver.RyaTripleContext;
 import mvm.rya.indexing.accumulo.ConfigUtils;
@@ -43,6 +41,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.openrdf.model.Value;