You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by dl...@apache.org on 2017/09/25 19:18:08 UTC

[2/3] incubator-rya git commit: RYA-266 Issue fixed where callers don-t call init on the index.

RYA-266 Issue fixed where callers don-t call init on the index.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/f7b2fd63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/f7b2fd63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/f7b2fd63

Branch: refs/heads/master
Commit: f7b2fd63167359fd829cd9ee73216c5c9736d1a8
Parents: 7dd5913
Author: David Lotts <da...@parsons.com>
Authored: Fri Sep 22 14:48:05 2017 -0400
Committer: David Lotts <da...@parsons.com>
Committed: Fri Sep 22 14:48:05 2017 -0400

----------------------------------------------------------------------
 .../freetext/AccumuloFreeTextIndexer.java       | 65 ++++++++++++++++----
 .../temporal/AccumuloTemporalIndexer.java       | 39 +++++++++---
 .../freetext/AccumuloFreeTextIndexerTest.java   |  8 +++
 3 files changed, 91 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f7b2fd63/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
index 8c07a8c..9078015 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -213,7 +214,17 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
 
     private boolean isInit = false;
 
-
+    /**
+     * Called by setConf to initialize query only.  
+     * Use this alone if usage does not require writing.
+     * For a writeable (store and delete) version of this, 
+     * call setconf() and then setMultiTableBatchWriter(), then call init()
+     * that is what the DAO does.
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     * @throws TableNotFoundException
+     * @throws TableExistsException
+     */
     private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
             TableExistsException {
         final String doctable = getFreeTextDocTablename(conf);
@@ -260,19 +271,25 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
             tableOps.setProperty(doctable, "table.bloom.enabled", Boolean.TRUE.toString());
         }
 
-        mtbw = ConfigUtils.createMultitableBatchWriter(conf);
-
-        docTableBw = mtbw.getBatchWriter(doctable);
-        termTableBw = mtbw.getBatchWriter(termtable);
-
+        // Set mtbw by calling setMultiTableBatchWriter().  The DAO does this and manages flushing.
+        // If you create it here, tests work, but a real Accumulo may lose writes due to unmanaged flushing.
+        if (mtbw != null) {
+	        docTableBw = mtbw.getBatchWriter(doctable);
+	        termTableBw = mtbw.getBatchWriter(termtable);
+        }
         tokenizer = ConfigUtils.getFreeTextTokenizer(conf);
         validPredicates = ConfigUtils.getFreeTextPredicates(conf);
 
         queryTermLimit = ConfigUtils.getFreeTextTermLimit(conf);
     }
 
-
-  //initialization occurs in setConf because index is created using reflection
+    /**
+     * setConf sets the configuration and then initializes for query only.  
+     * Use this alone if usage does not require writing.
+     * For a writeable (store and delete) version of this, 
+     * call this and then setMultiTableBatchWriter(), then call init()
+     * that is what the DAO does.
+     */
     @Override
     public void setConf(final Configuration conf) {
         this.conf = conf;
@@ -294,6 +311,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
 
 
     private void storeStatement(final Statement statement) throws IOException {
+        Objects.requireNonNull(mtbw, "Freetext indexer attempting to store, but setMultiTableBatchWriter() was not set.");
+
         // if the predicate list is empty, accept all predicates.
         // Otherwise, make sure the predicate is on the "valid" list
         final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
@@ -393,7 +412,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
     @Override
     public void close() throws IOException {
         try {
-            mtbw.close();
+        	if (mtbw!=null)
+        		mtbw.close();
         } catch (final MutationsRejectedException e) {
             logger.error("error closing the batch writer", e);
             throw new IOException(e);
@@ -636,7 +656,13 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
         return makeFreeTextDocTablename( ConfigUtils.getTablePrefix(conf) );
     }
 
-    /**
+    @Override
+	public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException {
+        mtbw = writer;
+	}
+
+
+	/**
      * Get the Term index's table name.
      *
      * @param conf - The Rya configuration that specifies which instance of Rya
@@ -684,6 +710,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
     }
 
     private void deleteStatement(final Statement statement) throws IOException {
+        Objects.requireNonNull(mtbw, "Freetext indexer attempting to delete, but setMultiTableBatchWriter() was not set.");
+
         // if the predicate list is empty, accept all predicates.
         // Otherwise, make sure the predicate is on the "valid" list
         final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
@@ -795,10 +823,21 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
     }
 
 
-	@Override
+	/** 
+	 * called by the DAO after setting the mtbw.
+	 * The rest of the initilization is done by setConf()
+	 */
+    @Override
 	public void init() {
-		// TODO Auto-generated method stub
-
+        Objects.requireNonNull(mtbw, "Freetext indexer failed to initialize temporal index, setMultiTableBatchWriter() was not set.");
+        Objects.requireNonNull(conf, "Freetext indexer failed to initialize temporal index, setConf() was not set.");
+        try {
+			docTableBw = mtbw.getBatchWriter(getFreeTextDocTablename(conf));
+			termTableBw = mtbw.getBatchWriter(getFreeTextTermTablename(conf));
+		} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+			logger.error("Unable to initialize index.  Throwing Runtime Exception. ", e);
+            throw new RuntimeException(e);		
+        }
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f7b2fd63/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
index 042d938..6a78680 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -116,37 +117,56 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
     public void init() {
         if (!isInit) {
             try {
-                initInternal();
+                initReadWrite();
                 isInit = true;
             } catch (final AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException | RyaClientException e) {
-                logger.warn("Unable to initialize index.  Throwing Runtime Exception. ", e);
+                logger.error("Unable to initialize index.  Throwing Runtime Exception. ", e);
                 throw new RuntimeException(e);
             }
         }
     }
-
-    private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
+    /**
+     * Initialize for writable use.  
+     * This is called from the DAO, perhaps others.
+     */
+    private void initReadWrite() throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
                     TableExistsException, RyaClientException {
         if (mtbw == null)
             throw new RyaClientException("Failed to initialize temporal index, setMultiTableBatchWriter() was not set.");
         if (conf == null)
             throw new RyaClientException("Failed to initialize temporal index, setConf() was not set.");
+        if (temporalIndexTableName==null)
+            throw new RyaClientException("Failed to set temporalIndexTableName==null.");
 
-        temporalIndexTableName = getTableName();
+        // Now do all the writable setup, read should already be complete.
         // Create one index table on first run.
         Boolean isCreated = ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName);
         if (isCreated) {
             logger.info("First run, created temporal index table: " + temporalIndexTableName);
         }
         temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName);
-        validPredicates = ConfigUtils.getTemporalPredicates(conf);
     }
 
-    //initialization occurs in setConf because index is created using reflection
+    /**
+     * Initialize everything for a query-only use.  
+     * This is called from setConf, since that must be called by anyone.
+     * The DAO will also call setMultiTableBatchWriter() and init().
+     */
+	private void initReadOnly()  {
+		if (conf == null)
+			throw new Error("Failed to initialize temporal index, setConf() was not set.");
+		temporalIndexTableName = getTableName();
+		validPredicates = ConfigUtils.getTemporalPredicates(conf);
+	}
+
+    /**
+     * Set the configuration, then initialize for read (query) use only.
+     * Readonly initialization occurs in setConf because it does not require setting a multitablebatchwriter (mtbw).
+     */
     @Override
     public void setConf(final Configuration conf) {
         this.conf = conf;
-
+			initReadOnly();
     }
 
     @Override
@@ -165,6 +185,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
      * T O D O parse an interval using multiple predicates for same subject -- ontology dependent.
      */
     private void storeStatement(final Statement statement) throws IOException, IllegalArgumentException {
+    	Objects.requireNonNull(temporalIndexBatchWriter,"This is not initialized for writing.  Must call setMultiTableBatchWriter() and init().");
         // if the predicate list is empty, accept all predicates.
         // Otherwise, make sure the predicate is on the "valid" list
         final boolean isValidPredicate = validPredicates == null || validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
@@ -908,6 +929,8 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
     }
 
     private void deleteStatement(final Statement statement) throws IOException, IllegalArgumentException {
+    	Objects.requireNonNull(temporalIndexBatchWriter,"This is not initialized for writing.  Must call setMultiTableBatchWriter() and init().");
+
         // if the predicate list is empty, accept all predicates.
         // Otherwise, make sure the predicate is on the "valid" list
         final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f7b2fd63/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
index cba550c..531085d 100644
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
@@ -93,6 +93,8 @@ public class AccumuloFreeTextIndexerTest {
     public void testSearch() throws Exception {
         try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
             f.setConf(conf);
+            f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+            f.init();
 
             ValueFactory vf = new ValueFactoryImpl();
 
@@ -134,6 +136,8 @@ public class AccumuloFreeTextIndexerTest {
     public void testDelete() throws Exception {
         try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
             f.setConf(conf);
+            f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+            f.init();
 
             ValueFactory vf = new ValueFactoryImpl();
 
@@ -187,6 +191,8 @@ public class AccumuloFreeTextIndexerTest {
 
         try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
             f.setConf(conf);
+            f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+            f.init();
 
             // These should not be stored because they are not in the predicate list
             f.storeStatement(new RyaStatement(new RyaURI("foo:subj1"), new RyaURI(RDFS.LABEL.toString()), new RyaType("invalid")));
@@ -222,6 +228,8 @@ public class AccumuloFreeTextIndexerTest {
     public void testContextSearch() throws Exception {
         try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
             f.setConf(conf);
+            f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+            f.init();
 
             ValueFactory vf = new ValueFactoryImpl();
             URI subject = new URIImpl("foo:subj");