You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by dl...@apache.org on 2017/09/25 19:18:07 UTC
[1/3] incubator-rya git commit: RYA-266 Added init() and other calls
where ever the Accumulo temporal indexer is created and used for storing.
Repository: incubator-rya
Updated Branches:
refs/heads/master c03c8bbef -> 86c866eda
RYA-266 Added init() and other calls where ever the Accumulo temporal indexer is created and used for storing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/7dd5913a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/7dd5913a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/7dd5913a
Branch: refs/heads/master
Commit: 7dd5913acd29addfa3e55480d6b2dfe77824c205
Parents: c03c8bb
Author: David W. Lotts <da...@parsons.com>
Authored: Fri Mar 31 14:05:07 2017 -0400
Committer: David Lotts <da...@parsons.com>
Committed: Thu Sep 21 12:38:56 2017 -0400
----------------------------------------------------------------------
.../temporal/AccumuloTemporalIndexer.java | 84 ++++++++++++--------
.../temporal/AccumuloTemporalIndexerTest.java | 32 +++++---
.../apache/rya/accumulo/mr/RyaOutputFormat.java | 17 +++-
.../rya/accumulo/mr/RyaOutputFormatTest.java | 25 +++---
4 files changed, 102 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7dd5913a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
index fcc1c58..042d938 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.apache.rya.indexing.KeyParts;
@@ -102,17 +103,42 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
private boolean isInit = false;
+ /**
+ * intilize the temporal index.
+ * This is dependent on a few set method calls before init:
+ * > Connector = ConfigUtils.getConnector(conf);
+ * > MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
+ * > // optional: temporal.setConnector(connector);
+ * > temporal.setMultiTableBatchWriter(mtbw);
+ * > temporal.init();
+ */
+ @Override
+ public void init() {
+ if (!isInit) {
+ try {
+ initInternal();
+ isInit = true;
+ } catch (final AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException | RyaClientException e) {
+ logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
- TableExistsException {
+ TableExistsException, RyaClientException {
+ if (mtbw == null)
+ throw new RyaClientException("Failed to initialize temporal index, setMultiTableBatchWriter() was not set.");
+ if (conf == null)
+ throw new RyaClientException("Failed to initialize temporal index, setConf() was not set.");
+
temporalIndexTableName = getTableName();
// Create one index table on first run.
- ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName);
-
- mtbw = ConfigUtils.createMultitableBatchWriter(conf);
-
+ Boolean isCreated = ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName);
+ if (isCreated) {
+ logger.info("First run, created temporal index table: " + temporalIndexTableName);
+ }
temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName);
-
validPredicates = ConfigUtils.getTemporalPredicates(conf);
}
@@ -120,24 +146,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
@Override
public void setConf(final Configuration conf) {
this.conf = conf;
- if (!isInit) {
- try {
- initInternal();
- isInit = true;
- } catch (final AccumuloException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (final AccumuloSecurityException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (final TableNotFoundException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- } catch (final TableExistsException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
- throw new RuntimeException(e);
- }
- }
+
}
@Override
@@ -158,16 +167,20 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
private void storeStatement(final Statement statement) throws IOException, IllegalArgumentException {
// if the predicate list is empty, accept all predicates.
// Otherwise, make sure the predicate is on the "valid" list
- final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
+ final boolean isValidPredicate = validPredicates == null || validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
if (!isValidPredicate || !(statement.getObject() instanceof Literal)) {
return;
}
+
final DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval
extractDateTime(statement, indexDateTimes);
if (indexDateTimes[0]==null) {
return;
}
+ if (!this.isInit)
+ throw new RuntimeException("Method .init() was not called (or failed) before attempting to store statements.");
+
// Add this as an instant, or interval.
try {
if (indexDateTimes[1] != null) {
@@ -865,8 +878,9 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
@Override
public void close() throws IOException {
try {
-
- mtbw.close();
+ if (mtbw != null) {
+ mtbw.close();
+ }
} catch (final MutationsRejectedException e) {
final String msg = "Error while closing the batch writer.";
@@ -926,12 +940,6 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
}
@Override
- public void init() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
public void setConnector(final Connector connector) {
// TODO Auto-generated method stub
@@ -954,4 +962,14 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
// TODO Auto-generated method stub
}
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer#setMultiTableBatchWriter(org.apache.accumulo.core.client.MultiTableBatchWriter)
+ */
+ @Override
+ public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException {
+ mtbw = writer;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7dd5913a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java
index 80824b8..839ef6a 100644
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java
@@ -21,8 +21,10 @@ package org.apache.rya.indexing.accumulo.temporal;
import static org.apache.rya.api.resolver.RdfToRyaConversions.convertStatement;
-import static org.junit.Assert.*;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.io.PrintStream;
import java.security.NoSuchAlgorithmException;
@@ -35,8 +37,12 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -48,8 +54,17 @@ import org.apache.commons.codec.binary.StringUtils;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.StatementSerializer;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -64,14 +79,6 @@ import org.openrdf.query.QueryEvaluationException;
import com.beust.jcommander.internal.Lists;
import info.aduna.iteration.CloseableIteration;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.StatementConstraints;
-import org.apache.rya.indexing.StatementSerializer;
-import org.apache.rya.indexing.TemporalInstant;
-import org.apache.rya.indexing.TemporalInstantRfc3339;
-import org.apache.rya.indexing.TemporalInterval;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
/**
* JUnit tests for TemporalIndexer and it's implementation AccumuloTemporalIndexer
@@ -230,6 +237,11 @@ public final class AccumuloTemporalIndexerTest {
tIndexer = new AccumuloTemporalIndexer();
tIndexer.setConf(conf);
+ Connector connector = ConfigUtils.getConnector(conf);
+ MultiTableBatchWriter mt_bw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
+ tIndexer.setConnector(connector);
+ tIndexer.setMultiTableBatchWriter(mt_bw);
+ tIndexer.init();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7dd5913a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
index 155d694..1336364 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
@@ -45,9 +45,6 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.log4j.Logger;
-import org.openrdf.model.Statement;
-import org.openrdf.model.vocabulary.XMLSchema;
-
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRdfConstants;
import org.apache.rya.accumulo.AccumuloRyaDAO;
@@ -64,6 +61,8 @@ import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.openrdf.model.Statement;
+import org.openrdf.model.vocabulary.XMLSchema;
/**
* {@link OutputFormat} that uses Rya, the {@link GeoIndexer}, the
@@ -216,12 +215,22 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
return freeText;
}
- private static TemporalIndexer getTemporalIndexer(Configuration conf) {
+ private static TemporalIndexer getTemporalIndexer(Configuration conf) throws IOException {
if (!conf.getBoolean(ENABLE_TEMPORAL, true)) {
return null;
}
AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
temporal.setConf(conf);
+ Connector connector;
+ try {
+ connector = ConfigUtils.getConnector(conf);
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new IOException("Error when attempting to create a connection for writing the temporal index.", e);
+ }
+ MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
+ temporal.setConnector(connector);
+ temporal.setMultiTableBatchWriter(mtbw);
+ temporal.init();
return temporal;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7dd5913a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java
index 0b44a92..96f57f6 100644
--- a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java
+++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java
@@ -5,7 +5,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.client.mock.MockInstance;
@@ -17,15 +19,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.openrdf.model.Statement;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-
-import info.aduna.iteration.CloseableIteration;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.api.domain.RyaStatement;
@@ -43,6 +36,15 @@ import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
import org.apache.rya.indexing.accumulo.freetext.SimpleTokenizer;
import org.apache.rya.indexing.accumulo.freetext.Tokenizer;
import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import info.aduna.iteration.CloseableIteration;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -235,6 +237,11 @@ public class RyaOutputFormatTest {
}
final AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
temporal.setConf(conf);
+ Connector connector = ConfigUtils.getConnector(conf);
+ MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
+ temporal.setConnector(connector);
+ temporal.setMultiTableBatchWriter(mtbw);
+ temporal.init();
final Set<Statement> empty = new HashSet<>();
final Set<Statement> head = new HashSet<>();
final Set<Statement> tail = new HashSet<>();
[2/3] incubator-rya git commit: RYA-266 Issue fixed where callers
don-t call init on the index.
Posted by dl...@apache.org.
RYA-266 Issue fixed where callers don-t call init on the index.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/f7b2fd63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/f7b2fd63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/f7b2fd63
Branch: refs/heads/master
Commit: f7b2fd63167359fd829cd9ee73216c5c9736d1a8
Parents: 7dd5913
Author: David Lotts <da...@parsons.com>
Authored: Fri Sep 22 14:48:05 2017 -0400
Committer: David Lotts <da...@parsons.com>
Committed: Fri Sep 22 14:48:05 2017 -0400
----------------------------------------------------------------------
.../freetext/AccumuloFreeTextIndexer.java | 65 ++++++++++++++++----
.../temporal/AccumuloTemporalIndexer.java | 39 +++++++++---
.../freetext/AccumuloFreeTextIndexerTest.java | 8 +++
3 files changed, 91 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f7b2fd63/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
index 8c07a8c..9078015 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -213,7 +214,17 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
private boolean isInit = false;
-
+ /**
+ * Called by setConf to initialize query only.
+ * Use this alone if usage does not require writing.
+ * For a writeable (store and delete) version of this,
+ * call setconf() and then setMultiTableBatchWriter(), then call init()
+ * that is what the DAO does.
+ * @throws AccumuloException
+ * @throws AccumuloSecurityException
+ * @throws TableNotFoundException
+ * @throws TableExistsException
+ */
private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
TableExistsException {
final String doctable = getFreeTextDocTablename(conf);
@@ -260,19 +271,25 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
tableOps.setProperty(doctable, "table.bloom.enabled", Boolean.TRUE.toString());
}
- mtbw = ConfigUtils.createMultitableBatchWriter(conf);
-
- docTableBw = mtbw.getBatchWriter(doctable);
- termTableBw = mtbw.getBatchWriter(termtable);
-
+ // Set mtbw by calling setMultiTableBatchWriter(). The DAO does this and manages flushing.
+ // If you create it here, tests work, but a real Accumulo may lose writes due to unmanaged flushing.
+ if (mtbw != null) {
+ docTableBw = mtbw.getBatchWriter(doctable);
+ termTableBw = mtbw.getBatchWriter(termtable);
+ }
tokenizer = ConfigUtils.getFreeTextTokenizer(conf);
validPredicates = ConfigUtils.getFreeTextPredicates(conf);
queryTermLimit = ConfigUtils.getFreeTextTermLimit(conf);
}
-
- //initialization occurs in setConf because index is created using reflection
+ /**
+ * setConf sets the configuration and then initializes for query only.
+ * Use this alone if usage does not require writing.
+ * For a writeable (store and delete) version of this,
+ * call this and then setMultiTableBatchWriter(), then call init()
+ * that is what the DAO does.
+ */
@Override
public void setConf(final Configuration conf) {
this.conf = conf;
@@ -294,6 +311,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
private void storeStatement(final Statement statement) throws IOException {
+ Objects.requireNonNull(mtbw, "Freetext indexer attempting to store, but setMultiTableBatchWriter() was not set.");
+
// if the predicate list is empty, accept all predicates.
// Otherwise, make sure the predicate is on the "valid" list
final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
@@ -393,7 +412,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
@Override
public void close() throws IOException {
try {
- mtbw.close();
+ if (mtbw!=null)
+ mtbw.close();
} catch (final MutationsRejectedException e) {
logger.error("error closing the batch writer", e);
throw new IOException(e);
@@ -636,7 +656,13 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
return makeFreeTextDocTablename( ConfigUtils.getTablePrefix(conf) );
}
- /**
+ @Override
+ public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException {
+ mtbw = writer;
+ }
+
+
+ /**
* Get the Term index's table name.
*
* @param conf - The Rya configuration that specifies which instance of Rya
@@ -684,6 +710,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
}
private void deleteStatement(final Statement statement) throws IOException {
+ Objects.requireNonNull(mtbw, "Freetext indexer attempting to delete, but setMultiTableBatchWriter() was not set.");
+
// if the predicate list is empty, accept all predicates.
// Otherwise, make sure the predicate is on the "valid" list
final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
@@ -795,10 +823,21 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements
}
- @Override
+ /**
+ * called by the DAO after setting the mtbw.
+ * The rest of the initilization is done by setConf()
+ */
+ @Override
public void init() {
- // TODO Auto-generated method stub
-
+ Objects.requireNonNull(mtbw, "Freetext indexer failed to initialize temporal index, setMultiTableBatchWriter() was not set.");
+ Objects.requireNonNull(conf, "Freetext indexer failed to initialize temporal index, setConf() was not set.");
+ try {
+ docTableBw = mtbw.getBatchWriter(getFreeTextDocTablename(conf));
+ termTableBw = mtbw.getBatchWriter(getFreeTextTermTablename(conf));
+ } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+ logger.error("Unable to initialize index. Throwing Runtime Exception. ", e);
+ throw new RuntimeException(e);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f7b2fd63/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
index 042d938..6a78680 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -116,37 +117,56 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
public void init() {
if (!isInit) {
try {
- initInternal();
+ initReadWrite();
isInit = true;
} catch (final AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException | RyaClientException e) {
- logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
+ logger.error("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
}
}
}
-
- private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
+ /**
+ * Initialize for writable use.
+ * This is called from the DAO, perhaps others.
+ */
+ private void initReadWrite() throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
TableExistsException, RyaClientException {
if (mtbw == null)
throw new RyaClientException("Failed to initialize temporal index, setMultiTableBatchWriter() was not set.");
if (conf == null)
throw new RyaClientException("Failed to initialize temporal index, setConf() was not set.");
+ if (temporalIndexTableName==null)
+ throw new RyaClientException("Failed to set temporalIndexTableName==null.");
- temporalIndexTableName = getTableName();
+ // Now do all the writable setup, read should already be complete.
// Create one index table on first run.
Boolean isCreated = ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName);
if (isCreated) {
logger.info("First run, created temporal index table: " + temporalIndexTableName);
}
temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName);
- validPredicates = ConfigUtils.getTemporalPredicates(conf);
}
- //initialization occurs in setConf because index is created using reflection
+ /**
+ * Initialize everything for a query-only use.
+ * This is called from setConf, since that must be called by anyone.
+ * The DAO will also call setMultiTableBatchWriter() and init().
+ */
+ private void initReadOnly() {
+ if (conf == null)
+ throw new Error("Failed to initialize temporal index, setConf() was not set.");
+ temporalIndexTableName = getTableName();
+ validPredicates = ConfigUtils.getTemporalPredicates(conf);
+ }
+
+ /**
+ * Set the configuration, then initialize for read (query) use only.
+ * Readonly initialization occurs in setConf because it does not require setting a multitablebatchwriter (mtbw).
+ */
@Override
public void setConf(final Configuration conf) {
this.conf = conf;
-
+ initReadOnly();
}
@Override
@@ -165,6 +185,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
* T O D O parse an interval using multiple predicates for same subject -- ontology dependent.
*/
private void storeStatement(final Statement statement) throws IOException, IllegalArgumentException {
+ Objects.requireNonNull(temporalIndexBatchWriter,"This is not initialized for writing. Must call setMultiTableBatchWriter() and init().");
// if the predicate list is empty, accept all predicates.
// Otherwise, make sure the predicate is on the "valid" list
final boolean isValidPredicate = validPredicates == null || validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
@@ -908,6 +929,8 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements
}
private void deleteStatement(final Statement statement) throws IOException, IllegalArgumentException {
+ Objects.requireNonNull(temporalIndexBatchWriter,"This is not initialized for writing. Must call setMultiTableBatchWriter() and init().");
+
// if the predicate list is empty, accept all predicates.
// Otherwise, make sure the predicate is on the "valid" list
final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f7b2fd63/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
index cba550c..531085d 100644
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
@@ -93,6 +93,8 @@ public class AccumuloFreeTextIndexerTest {
public void testSearch() throws Exception {
try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
f.setConf(conf);
+ f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+ f.init();
ValueFactory vf = new ValueFactoryImpl();
@@ -134,6 +136,8 @@ public class AccumuloFreeTextIndexerTest {
public void testDelete() throws Exception {
try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
f.setConf(conf);
+ f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+ f.init();
ValueFactory vf = new ValueFactoryImpl();
@@ -187,6 +191,8 @@ public class AccumuloFreeTextIndexerTest {
try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
f.setConf(conf);
+ f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+ f.init();
// These should not be stored because they are not in the predicate list
f.storeStatement(new RyaStatement(new RyaURI("foo:subj1"), new RyaURI(RDFS.LABEL.toString()), new RyaType("invalid")));
@@ -222,6 +228,8 @@ public class AccumuloFreeTextIndexerTest {
public void testContextSearch() throws Exception {
try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
f.setConf(conf);
+ f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+ f.init();
ValueFactory vf = new ValueFactoryImpl();
URI subject = new URIImpl("foo:subj");
[3/3] incubator-rya git commit: RYA-266 Create a batchWriter for
freetext indexer, the indexer no longer creates it. Closes #149
Posted by dl...@apache.org.
RYA-266 Create a batchWriter for freetext indexer, the indexer no longer creates it. Closes #149
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/86c866ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/86c866ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/86c866ed
Branch: refs/heads/master
Commit: 86c866edab94666d5e679e02b75f970cd24f98ee
Parents: f7b2fd6
Author: David Lotts <da...@parsons.com>
Authored: Mon Sep 25 11:59:33 2017 -0400
Committer: David Lotts <da...@parsons.com>
Committed: Mon Sep 25 14:51:42 2017 -0400
----------------------------------------------------------------------
.../org/apache/rya/accumulo/mr/RyaOutputFormat.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/86c866ed/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
index 1336364..5332260 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
@@ -206,12 +206,23 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
}
- private static FreeTextIndexer getFreeTextIndexer(Configuration conf) {
+ private static FreeTextIndexer getFreeTextIndexer(Configuration conf) throws IOException {
if (!conf.getBoolean(ENABLE_FREETEXT, true)) {
return null;
}
AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer();
freeText.setConf(conf);
+ Connector connector;
+ try {
+ connector = ConfigUtils.getConnector(conf);
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new IOException("Error when attempting to create a connection for writing the freeText index.", e);
+ }
+ MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
+ freeText.setConnector(connector);
+ freeText.setMultiTableBatchWriter(mtbw);
+ freeText.init();
+
return freeText;
}