You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2016/10/15 20:06:55 UTC
[35/69] [abbrv] [partial] incubator-rya git commit: RYA-198 Renaming
Files
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java
new file mode 100644
index 0000000..d13f50e
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfQueryIterator.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//package mvm.rya.accumulo;
+
+//
+//import com.google.common.collect.Iterators;
+//import com.google.common.io.ByteArrayDataInput;
+//import com.google.common.io.ByteStreams;
+//import info.aduna.iteration.CloseableIteration;
+//import mvm.rya.api.RdfCloudTripleStoreConstants;
+//import mvm.rya.api.RdfCloudTripleStoreUtils;
+//import mvm.rya.api.persist.RdfDAOException;
+//import mvm.rya.api.utils.NullableStatementImpl;
+//import org.apache.accumulo.core.client.*;
+//import org.apache.accumulo.core.data.Key;
+//import org.apache.accumulo.core.data.Range;
+//import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+//import org.apache.accumulo.core.iterators.user.TimestampFilter;
+//import org.apache.accumulo.core.security.Authorizations;
+//import org.apache.hadoop.io.Text;
+//import org.openrdf.model.Resource;
+//import org.openrdf.model.Statement;
+//import org.openrdf.model.URI;
+//import org.openrdf.model.Value;
+//import org.openrdf.query.BindingSet;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.IOException;
+//import java.util.Collection;
+//import java.util.Collections;
+//import java.util.HashSet;
+//import java.util.Iterator;
+//import java.util.Map.Entry;
+//
+//import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+//import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+//import static mvm.rya.api.RdfCloudTripleStoreUtils.writeValue;
+//
+//public class AccumuloRdfQueryIterator implements
+// CloseableIteration<Entry<Statement, BindingSet>, RdfDAOException> {
+//
+// protected final Logger logger = LoggerFactory.getLogger(getClass());
+//
+// private boolean open = false;
+// private Iterator result;
+// private Resource[] contexts;
+// private Collection<Entry<Statement, BindingSet>> statements;
+// private int numOfThreads = 20;
+//
+// private RangeBindingSetEntries rangeMap = new RangeBindingSetEntries();
+// private ScannerBase scanner;
+// private boolean isBatchScanner = true;
+// private Statement statement;
+// Iterator<BindingSet> iter_bss = null;
+//
+// private boolean hasNext = true;
+// private AccumuloRdfConfiguration conf;
+// private TABLE_LAYOUT tableLayout;
+// private Text context_txt;
+//
+// private DefineTripleQueryRangeFactory queryRangeFactory = new DefineTripleQueryRangeFactory();
+//
+// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, Resource... contexts)
+// throws RdfDAOException {
+// this(statements, connector, null, contexts);
+// }
+//
+// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector,
+// AccumuloRdfConfiguration conf, Resource... contexts)
+// throws RdfDAOException {
+// this.statements = statements;
+// this.contexts = contexts;
+// this.conf = conf;
+// initialize(connector);
+// open = true;
+// }
+//
+// public AccumuloRdfQueryIterator(Resource subject, URI predicate, Value object, Connector connector,
+// AccumuloRdfConfiguration conf, Resource[] contexts) throws RdfDAOException {
+// this(Collections.<Entry<Statement, BindingSet>>singleton(new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(
+// new NullableStatementImpl(subject, predicate, object, contexts),
+// null)), connector, conf, contexts);
+// }
+//
+// protected void initialize(Connector connector)
+// throws RdfDAOException {
+// try {
+// //TODO: We cannot span multiple tables here
+// Collection<Range> ranges = new HashSet<Range>();
+//
+// result = Iterators.emptyIterator();
+// Long startTime = conf.getStartTime();
+// Long ttl = conf.getTtl();
+//
+// Resource context = null;
+// for (Entry<Statement, BindingSet> stmtbs : statements) {
+// Statement stmt = stmtbs.getKey();
+// Resource subject = stmt.getSubject();
+// URI predicate = stmt.getPredicate();
+// Value object = stmt.getObject();
+// context = stmt.getContext(); //TODO: assumes the same context for all statements
+// logger.debug("Batch Scan, lookup subject[" + subject + "] predicate[" + predicate + "] object[" + object + "] combination");
+//
+// Entry<TABLE_LAYOUT, Range> entry = queryRangeFactory.defineRange(subject, predicate, object, conf);
+// tableLayout = entry.getKey();
+//// isTimeRange = isTimeRange || queryRangeFactory.isTimeRange();
+// Range range = entry.getValue();
+// ranges.add(range);
+// rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, stmtbs.getValue()));
+// }
+//
+// Authorizations authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+// String auth = conf.getAuth();
+// if (auth != null) {
+// authorizations = new Authorizations(auth.split(","));
+// }
+// String table = RdfCloudTripleStoreUtils.layoutToTable(tableLayout, conf);
+// result = createScanner(connector, authorizations, table, context, startTime, ttl, ranges);
+//// if (isBatchScanner) {
+//// ((BatchScanner) scanner).setRanges(ranges);
+//// } else {
+//// for (Range range : ranges) {
+//// ((Scanner) scanner).setRange(range); //TODO: Not good way of doing this
+//// }
+//// }
+////
+//// if (isBatchScanner) {
+//// result = ((BatchScanner) scanner).iterator();
+//// } else {
+//// result = ((Scanner) scanner).iterator();
+//// }
+// } catch (Exception e) {
+// throw new RdfDAOException(e);
+// }
+// }
+//
+// protected Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> createScanner(Connector connector, Authorizations authorizations, String table, Resource context, Long startTime, Long ttl, Collection<Range> ranges) throws TableNotFoundException, IOException {
+//// ShardedConnector shardedConnector = new ShardedConnector(connector, 4, ta)
+// if (rangeMap.ranges.size() > (numOfThreads / 2)) { //TODO: Arbitrary number, make configurable
+// BatchScanner scannerBase = connector.createBatchScanner(table, authorizations, numOfThreads);
+// scannerBase.setRanges(ranges);
+// populateScanner(context, startTime, ttl, scannerBase);
+// return scannerBase.iterator();
+// } else {
+// isBatchScanner = false;
+// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>>[] iters = new Iterator[ranges.size()];
+// int i = 0;
+// for (Range range : ranges) {
+// Scanner scannerBase = connector.createScanner(table, authorizations);
+// populateScanner(context, startTime, ttl, scannerBase);
+// scannerBase.setRange(range);
+// iters[i] = scannerBase.iterator();
+// i++;
+// scanner = scannerBase; //TODO: Always overridden, but doesn't matter since Scanner doesn't need to be closed
+// }
+// return Iterators.concat(iters);
+// }
+//
+// }
+//
+// protected void populateScanner(Resource context, Long startTime, Long ttl, ScannerBase scannerBase) throws IOException {
+// if (context != null) { //default graph
+// context_txt = new Text(writeValue(context));
+// scannerBase.fetchColumnFamily(context_txt);
+// }
+//
+//// if (!isQueryTimeBased(conf)) {
+// if (startTime != null && ttl != null) {
+//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator");
+//// scannerBase.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName());
+//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, ttl);
+//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, startTime);
+// IteratorSetting setting = new IteratorSetting(1, "fi", TimestampFilter.class.getName());
+// TimestampFilter.setStart(setting, startTime, true);
+// TimestampFilter.setEnd(setting, startTime + ttl, true);
+// scannerBase.addScanIterator(setting);
+// } else if (ttl != null) {
+//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator");
+//// scannerBase.setScanIteratorOption("filteringIterator", "0", AgeOffFilter.class.getName());
+//// scannerBase.setScanIteratorOption("filteringIterator", "0.ttl", ttl);
+// IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName());
+// AgeOffFilter.setTTL(setting, ttl);
+// scannerBase.addScanIterator(setting);
+// }
+//// }
+// }
+//
+// @Override
+// public void close() throws RdfDAOException {
+// if (!open)
+// return;
+// verifyIsOpen();
+// open = false;
+// if (scanner != null && isBatchScanner) {
+// ((BatchScanner) scanner).close();
+// }
+// }
+//
+// public void verifyIsOpen() throws RdfDAOException {
+// if (!open) {
+// throw new RdfDAOException("Iterator not open");
+// }
+// }
+//
+// @Override
+// public boolean hasNext() throws RdfDAOException {
+// try {
+// if (!open)
+// return false;
+// verifyIsOpen();
+// /**
+// * For some reason, the result.hasNext returns false
+// * once at the end of the iterator, and then true
+// * for every subsequent call.
+// */
+// hasNext = (hasNext && result.hasNext());
+// return hasNext || ((iter_bss != null) && iter_bss.hasNext());
+// } catch (Exception e) {
+// throw new RdfDAOException(e);
+// }
+// }
+//
+// @Override
+// public Entry<Statement, BindingSet> next() throws RdfDAOException {
+// try {
+// if (!this.hasNext())
+// return null;
+//
+// return getStatement(result, contexts);
+// } catch (Exception e) {
+// throw new RdfDAOException(e);
+// }
+// }
+//
+// public Entry<Statement, BindingSet> getStatement(
+// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> rowResults,
+// Resource... filterContexts) throws IOException {
+// try {
+// while (true) {
+// if (iter_bss != null && iter_bss.hasNext()) {
+// return new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(statement, iter_bss.next());
+// }
+//
+// if (rowResults.hasNext()) {
+// Entry<Key, org.apache.accumulo.core.data.Value> entry = rowResults.next();
+// Key key = entry.getKey();
+// ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes());
+// statement = RdfCloudTripleStoreUtils.translateStatementFromRow(input, key.getColumnFamily(), tableLayout, RdfCloudTripleStoreConstants.VALUE_FACTORY);
+// iter_bss = rangeMap.containsKey(key).iterator();
+// } else
+// break;
+// }
+// } catch (Exception e) {
+// throw new IOException(e);
+// }
+// return null;
+// }
+//
+// @Override
+// public void remove() throws RdfDAOException {
+// next();
+// }
+//
+// public int getNumOfThreads() {
+// return numOfThreads;
+// }
+//
+// public void setNumOfThreads(int numOfThreads) {
+// this.numOfThreads = numOfThreads;
+// }
+//
+// public DefineTripleQueryRangeFactory getQueryRangeFactory() {
+// return queryRangeFactory;
+// }
+//
+// public void setQueryRangeFactory(DefineTripleQueryRangeFactory queryRangeFactory) {
+// this.queryRangeFactory = queryRangeFactory;
+// }
+//}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java
new file mode 100644
index 0000000..157fc5a
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfUtils.java
@@ -0,0 +1,72 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.resolver.triple.TripleRow;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
+
+/**
+ * Class AccumuloRdfUtils
+ * Date: Mar 1, 2012
+ * Time: 7:15:54 PM
+ */
+public class AccumuloRdfUtils {
+ private static final Log logger = LogFactory.getLog(AccumuloRdfUtils.class);
+
+ public static void createTableIfNotExist(TableOperations tableOperations, String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException {
+ boolean tableExists = tableOperations.exists(tableName);
+ if (!tableExists) {
+ logger.debug("Creating accumulo table: " + tableName);
+ tableOperations.create(tableName);
+ }
+ }
+
+ public static Key from(TripleRow tripleRow) {
+ return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES),
+ defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES),
+ defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES),
+ defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES),
+ defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE));
+ }
+
+ public static Value extractValue(TripleRow tripleRow) {
+ return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES));
+ }
+
+ private static byte[] defaultTo(byte[] bytes, byte[] def) {
+ return bytes != null ? bytes : def;
+ }
+
+ private static Long defaultTo(Long l, Long def) {
+ return l != null ? l : def;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
new file mode 100644
index 0000000..195030e
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
@@ -0,0 +1,551 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.INFO_NAMESPACE_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_MEMORY;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_TIME;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.Namespace;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.experimental.AccumuloIndexer;
+import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RyaDAO;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.RyaNamespaceManager;
+import mvm.rya.api.resolver.RyaTripleContext;
+
+public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> {
+ private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class);
+
+ private boolean initialized = false;
+ private boolean flushEachUpdate = true;
+ private Connector connector;
+ private BatchWriterConfig batchWriterConfig;
+
+ private MultiTableBatchWriter mt_bw;
+
+ // Do not flush these individually
+ private BatchWriter bw_spo;
+ private BatchWriter bw_po;
+ private BatchWriter bw_osp;
+
+ private BatchWriter bw_ns;
+
+ private List<AccumuloIndexer> secondaryIndexers;
+
+ private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ private RyaTableMutationsFactory ryaTableMutationsFactory;
+ private TableLayoutStrategy tableLayoutStrategy;
+ private AccumuloRyaQueryEngine queryEngine;
+ private RyaTripleContext ryaContext;
+
+ @Override
+ public boolean isInitialized() throws RyaDAOException {
+ return initialized;
+ }
+
+ @Override
+ public void init() throws RyaDAOException {
+ if (initialized) {
+ return;
+ }
+ try {
+ checkNotNull(conf);
+ checkNotNull(connector);
+
+ if(batchWriterConfig == null){
+ batchWriterConfig = new BatchWriterConfig();
+ batchWriterConfig.setMaxMemory(MAX_MEMORY);
+ batchWriterConfig.setTimeout(MAX_TIME, TimeUnit.MILLISECONDS);
+ batchWriterConfig.setMaxWriteThreads(NUM_THREADS);
+ }
+
+ tableLayoutStrategy = conf.getTableLayoutStrategy();
+ ryaContext = RyaTripleContext.getInstance(conf);
+ ryaTableMutationsFactory = new RyaTableMutationsFactory(ryaContext);
+
+ secondaryIndexers = conf.getAdditionalIndexers();
+
+ flushEachUpdate = conf.flushEachUpdate();
+
+ TableOperations tableOperations = connector.tableOperations();
+ AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo());
+ AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo());
+ AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp());
+ AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs());
+
+ for (AccumuloIndexer index : secondaryIndexers) {
+ index.setConf(conf);
+ }
+
+ mt_bw = connector.createMultiTableBatchWriter(batchWriterConfig);
+
+ //get the batch writers for tables
+ bw_spo = mt_bw.getBatchWriter(tableLayoutStrategy.getSpo());
+ bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo());
+ bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp());
+
+ bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs());
+
+ for (AccumuloIndexer index : secondaryIndexers) {
+ index.setConnector(connector);
+ index.setMultiTableBatchWriter(mt_bw);
+ index.init();
+ }
+
+ queryEngine = new AccumuloRyaQueryEngine(connector, conf);
+
+ checkVersion();
+
+ initialized = true;
+ } catch (Exception e) {
+ throw new RyaDAOException(e);
+ }
+ }
+
+ @Override
+ public String getVersion() throws RyaDAOException {
+ String version = null;
+ CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf);
+ if (versIter.hasNext()) {
+ version = versIter.next().getObject().getData();
+ }
+ versIter.close();
+
+ return version;
+ }
+
+ @Override
+ public void add(RyaStatement statement) throws RyaDAOException {
+ commit(Iterators.singletonIterator(statement));
+ }
+
+ @Override
+ public void add(Iterator<RyaStatement> iter) throws RyaDAOException {
+ commit(iter);
+ }
+
+ @Override
+ public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException {
+ this.delete(Iterators.singletonIterator(stmt), aconf);
+ }
+
+ @Override
+ public void delete(Iterator<RyaStatement> statements, AccumuloRdfConfiguration conf) throws RyaDAOException {
+ try {
+ while (statements.hasNext()) {
+ RyaStatement stmt = statements.next();
+ //query first
+ CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf);
+ while (query.hasNext()) {
+ deleteSingleRyaStatement(query.next());
+ }
+
+ for (AccumuloIndexer index : secondaryIndexers) {
+ index.deleteStatement(stmt);
+ }
+ }
+ if (flushEachUpdate) { mt_bw.flush(); }
+ } catch (Exception e) {
+ throw new RyaDAOException(e);
+ }
+ }
+
+ @Override
+ public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException {
+ BatchDeleter bd_spo = null;
+ BatchDeleter bd_po = null;
+ BatchDeleter bd_osp = null;
+
+ try {
+ bd_spo = createBatchDeleter(tableLayoutStrategy.getSpo(), conf.getAuthorizations());
+ bd_po = createBatchDeleter(tableLayoutStrategy.getPo(), conf.getAuthorizations());
+ bd_osp = createBatchDeleter(tableLayoutStrategy.getOsp(), conf.getAuthorizations());
+
+ bd_spo.setRanges(Collections.singleton(new Range()));
+ bd_po.setRanges(Collections.singleton(new Range()));
+ bd_osp.setRanges(Collections.singleton(new Range()));
+
+ for (RyaURI graph : graphs){
+ bd_spo.fetchColumnFamily(new Text(graph.getData()));
+ bd_po.fetchColumnFamily(new Text(graph.getData()));
+ bd_osp.fetchColumnFamily(new Text(graph.getData()));
+ }
+
+ bd_spo.delete();
+ bd_po.delete();
+ bd_osp.delete();
+
+ //TODO indexers do not support delete-UnsupportedOperation Exception will be thrown
+// for (AccumuloIndex index : secondaryIndexers) {
+// index.dropGraph(graphs);
+// }
+
+ } catch (Exception e) {
+ throw new RyaDAOException(e);
+ } finally {
+ if (bd_spo != null) {
+ bd_spo.close();
+ }
+ if (bd_po != null) {
+ bd_po.close();
+ }
+ if (bd_osp != null) {
+ bd_osp.close();
+ }
+ }
+
+ }
+
+ protected void deleteSingleRyaStatement(RyaStatement stmt) throws IOException, MutationsRejectedException {
+ Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt);
+ bw_spo.addMutations(map.get(TABLE_LAYOUT.SPO));
+ bw_po.addMutations(map.get(TABLE_LAYOUT.PO));
+ bw_osp.addMutations(map.get(TABLE_LAYOUT.OSP));
+ }
+
+ protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException {
+ try {
+ //TODO: Should have a lock here in case we are adding and committing at the same time
+ while (commitStatements.hasNext()) {
+ RyaStatement stmt = commitStatements.next();
+
+ Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt);
+ Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+ Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+ Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+ bw_spo.addMutations(spo);
+ bw_po.addMutations(po);
+ bw_osp.addMutations(osp);
+
+ for (AccumuloIndexer index : secondaryIndexers) {
+ index.storeStatement(stmt);
+ }
+ }
+
+ if (flushEachUpdate) { mt_bw.flush(); }
+ } catch (Exception e) {
+ throw new RyaDAOException(e);
+ }
+ }
+
+ @Override
+ public void destroy() throws RyaDAOException {
+ if (!initialized) {
+ return;
+ }
+ //TODO: write lock
+ try {
+ initialized = false;
+ mt_bw.flush();
+
+ mt_bw.close();
+ } catch (Exception e) {
+ throw new RyaDAOException(e);
+ }
+ for(AccumuloIndexer indexer : this.secondaryIndexers) {
+ try {
+ indexer.destroy();
+ } catch(Exception e) {
+ logger.warn("Failed to destroy indexer", e);
+ }
+ }
+ }
+
+ @Override
+ public void addNamespace(String pfx, String namespace) throws RyaDAOException {
+ try {
+ Mutation m = new Mutation(new Text(pfx));
+ m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes()));
+ bw_ns.addMutation(m);
+ if (flushEachUpdate) { mt_bw.flush(); }
+ } catch (Exception e) {
+ throw new RyaDAOException(e);
+ }
+ }
+
+ @Override
+ public String getNamespace(String pfx) throws RyaDAOException {
+ try {
+ Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
+ ALL_AUTHORIZATIONS);
+ scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT);
+ scanner.setRange(new Range(new Text(pfx)));
+ Iterator<Map.Entry<Key, Value>> iterator = scanner
+ .iterator();
+
+ if (iterator.hasNext()) {
+ return new String(iterator.next().getValue().get());
+ }
+ } catch (Exception e) {
+ throw new RyaDAOException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public void removeNamespace(String pfx) throws RyaDAOException {
+ try {
+ Mutation del = new Mutation(new Text(pfx));
+ del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT);
+ bw_ns.addMutation(del);
+ if (flushEachUpdate) { mt_bw.flush(); }
+ } catch (Exception e) {
+ throw new RyaDAOException(e);
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException {
+ try {
+ Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(),
+ ALL_AUTHORIZATIONS);
+ scanner.fetchColumnFamily(INFO_NAMESPACE_TXT);
+ Iterator<Map.Entry<Key, Value>> result = scanner.iterator();
+ return new AccumuloNamespaceTableIterator(result);
+ } catch (Exception e) {
+ throw new RyaDAOException(e);
+ }
+ }
+
+ @Override
+ public RyaNamespaceManager<AccumuloRdfConfiguration> getNamespaceManager() {
+ return this;
+ }
+
+ @Override
+ public void purge(RdfCloudTripleStoreConfiguration configuration) {
+ for (String tableName : getTables()) {
+ try {
+ purge(tableName, configuration.getAuths());
+ compact(tableName);
+ } catch (TableNotFoundException e) {
+ logger.error(e.getMessage());
+ } catch (MutationsRejectedException e) {
+ logger.error(e.getMessage());
+ }
+ }
+ for(AccumuloIndexer indexer : this.secondaryIndexers) {
+ try {
+ indexer.purge(configuration);
+ } catch(Exception e) {
+ logger.error("Failed to purge indexer", e);
+ }
+ }
+ }
+
+ @Override
+ public void dropAndDestroy() throws RyaDAOException {
+ for (String tableName : getTables()) {
+ try {
+ drop(tableName);
+ } catch (AccumuloSecurityException e) {
+ logger.error(e.getMessage());
+ throw new RyaDAOException(e);
+ } catch (AccumuloException e) {
+ logger.error(e.getMessage());
+ throw new RyaDAOException(e);
+ } catch (TableNotFoundException e) {
+ logger.warn(e.getMessage());
+ }
+ }
+ destroy();
+ for(AccumuloIndexer indexer : this.secondaryIndexers) {
+ try {
+ indexer.dropAndDestroy();
+ } catch(Exception e) {
+ logger.error("Failed to drop and destroy indexer", e);
+ }
+ }
+ }
+
+ public Connector getConnector() {
+ return connector;
+ }
+
+ public void setConnector(Connector connector) {
+ this.connector = connector;
+ }
+
+ public BatchWriterConfig getBatchWriterConfig(){
+ return batchWriterConfig;
+ }
+
+ public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) {
+ this.batchWriterConfig = batchWriterConfig;
+ }
+
+ protected MultiTableBatchWriter getMultiTableBatchWriter(){
+ return mt_bw;
+ }
+
+ @Override
+ public AccumuloRdfConfiguration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(AccumuloRdfConfiguration conf) {
+ this.conf = conf;
+ }
+
+ public RyaTableMutationsFactory getRyaTableMutationsFactory() {
+ return ryaTableMutationsFactory;
+ }
+
+ public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) {
+ this.ryaTableMutationsFactory = ryaTableMutationsFactory;
+ }
+
+ @Override
+ public AccumuloRyaQueryEngine getQueryEngine() {
+ return queryEngine;
+ }
+
+ public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) {
+ this.queryEngine = queryEngine;
+ }
+
+ public void flush() throws RyaDAOException {
+ try {
+ mt_bw.flush();
+ } catch (MutationsRejectedException e) {
+ throw new RyaDAOException(e);
+ }
+ }
+
+ protected String[] getTables() {
+ // core tables
+ List<String> tableNames = Lists.newArrayList(
+ tableLayoutStrategy.getSpo(),
+ tableLayoutStrategy.getPo(),
+ tableLayoutStrategy.getOsp(),
+ tableLayoutStrategy.getNs(),
+ tableLayoutStrategy.getEval());
+
+ // Additional Tables
+ for (AccumuloIndexer index : secondaryIndexers) {
+ tableNames.add(index.getTableName());
+ }
+
+ return tableNames.toArray(new String[]{});
+ }
+
+ private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException {
+ if (tableExists(tableName)) {
+ logger.info("Purging accumulo table: " + tableName);
+ BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths));
+ try {
+ batchDeleter.setRanges(Collections.singleton(new Range()));
+ batchDeleter.delete();
+ } finally {
+ batchDeleter.close();
+ }
+ }
+ }
+
+ private void compact(String tableName) {
+ logger.info("Requesting major compaction for table " + tableName);
+ try {
+ connector.tableOperations().compact(tableName, null, null, true, false);
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ private boolean tableExists(String tableName) {
+ return getConnector().tableOperations().exists(tableName);
+ }
+
+ private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException {
+ return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS);
+ }
+
+ private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException {
+ String version = getVersion();
+ if (version == null) {
+ //adding to core Rya tables but not Indexes
+ Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement());
+ Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO);
+ Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO);
+ Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP);
+ bw_spo.addMutations(spo);
+ bw_po.addMutations(po);
+ bw_osp.addMutations(osp);
+ }
+ //TODO: Do a version check here
+ }
+
+ protected RyaStatement getVersionRyaStatement() {
+ return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA);
+ }
+
+ private void drop(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ logger.info("Dropping cloudbase table: " + tableName);
+ connector.tableOperations().delete(tableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java
new file mode 100644
index 0000000..b5a4e84
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/DefineTripleQueryRangeFactory.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//package mvm.rya.accumulo;
+
+//
+//import com.google.common.io.ByteArrayDataOutput;
+//import com.google.common.io.ByteStreams;
+//import mvm.rya.api.RdfCloudTripleStoreUtils;
+//import mvm.rya.api.domain.RangeValue;
+//import org.apache.accumulo.core.data.Range;
+//import org.apache.hadoop.io.Text;
+//import org.openrdf.model.Value;
+//import org.openrdf.model.ValueFactory;
+//import org.openrdf.model.impl.ValueFactoryImpl;
+//
+//import java.io.IOException;
+//import java.util.Map;
+//
+//import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
+//import static mvm.rya.api.RdfCloudTripleStoreUtils.CustomEntry;
+//
+///**
+// * Class DefineTripleQueryRangeFactory
+// * Date: Jun 2, 2011
+// * Time: 10:35:43 AM
+// */
+//public class DefineTripleQueryRangeFactory {
+//
+// ValueFactory vf = ValueFactoryImpl.getInstance();
+//
+// protected void fillRange(ByteArrayDataOutput startRowOut, ByteArrayDataOutput endRowOut, Value val, boolean empty)
+// throws IOException {
+// if(!empty) {
+// startRowOut.write(DELIM_BYTES);
+// endRowOut.write(DELIM_BYTES);
+// }
+// //null check?
+// if(val instanceof RangeValue) {
+// RangeValue rangeValue = (RangeValue) val;
+// Value start = rangeValue.getStart();
+// Value end = rangeValue.getEnd();
+// byte[] start_val_bytes = RdfCloudTripleStoreUtils.writeValue(start);
+// byte[] end_val_bytes = RdfCloudTripleStoreUtils.writeValue(end);
+// startRowOut.write(start_val_bytes);
+// endRowOut.write(end_val_bytes);
+// } else {
+// byte[] val_bytes = RdfCloudTripleStoreUtils.writeValue(val);
+// startRowOut.write(val_bytes);
+// endRowOut.write(val_bytes);
+// }
+// }
+//
+// public Map.Entry<TABLE_LAYOUT, Range> defineRange(Value subject, Value predicate, Value object, AccumuloRdfConfiguration conf)
+// throws IOException {
+//
+// byte[] startrow, stoprow;
+// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput();
+// ByteArrayDataOutput stopRowOut = ByteStreams.newDataOutput();
+// Range range;
+// TABLE_LAYOUT tableLayout;
+//
+// if (subject != null) {
+// /**
+// * Case: s
+// * Table: spo
+// * Want this to be the first if statement since it will be most likely the most asked for table
+// */
+// tableLayout = TABLE_LAYOUT.SPO;
+// fillRange(startRowOut, stopRowOut, subject, true);
+// if (predicate != null) {
+// /**
+// * Case: sp
+// * Table: spo
+// */
+// fillRange(startRowOut, stopRowOut, predicate, false);
+// if (object != null) {
+// /**
+// * Case: spo
+// * Table: spo
+// */
+// fillRange(startRowOut, stopRowOut, object, false);
+// }
+// } else if (object != null) {
+// /**
+// * Case: so
+// * Table: osp
+// * Very rare case. Could have put this in the OSP if clause, but I wanted to reorder the if statement
+// * for best performance. The SPO table probably gets the most scans, so I want it to be the first if
+// * statement in the branch.
+// */
+// tableLayout = TABLE_LAYOUT.OSP;
+// startRowOut = ByteStreams.newDataOutput();
+// stopRowOut = ByteStreams.newDataOutput();
+// fillRange(startRowOut, stopRowOut, object, true);
+// fillRange(startRowOut, stopRowOut, subject, false);
+// }
+// } else if (predicate != null) {
+// /**
+// * Case: p
+// * Table: po
+// * Wanted this to be the second if statement, since it will be the second most asked for table
+// */
+// tableLayout = TABLE_LAYOUT.PO;
+// fillRange(startRowOut, stopRowOut, predicate, true);
+// if (object != null) {
+// /**
+// * Case: po
+// * Table: po
+// */
+// fillRange(startRowOut, stopRowOut, object, false);
+// }
+// } else if (object != null) {
+// /**
+// * Case: o
+// * Table: osp
+// * Probably a pretty rare scenario
+// */
+// tableLayout = TABLE_LAYOUT.OSP;
+// fillRange(startRowOut, stopRowOut, object, true);
+// } else {
+// tableLayout = TABLE_LAYOUT.SPO;
+// stopRowOut.write(Byte.MAX_VALUE);
+// }
+//
+// startrow = startRowOut.toByteArray();
+// stopRowOut.write(DELIM_STOP_BYTES);
+// stoprow = stopRowOut.toByteArray();
+// Text startRowTxt = new Text(startrow);
+// Text stopRowTxt = new Text(stoprow);
+// range = new Range(startRowTxt, stopRowTxt);
+//
+// return new CustomEntry<TABLE_LAYOUT, Range>(tableLayout, range);
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java
new file mode 100644
index 0000000..574029e
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableKeyValues.java
@@ -0,0 +1,115 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
+
+import java.io.IOException;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+
+public class RyaTableKeyValues {
+ public static final ColumnVisibility EMPTY_CV = new ColumnVisibility();
+ public static final Text EMPTY_CV_TEXT = new Text(EMPTY_CV.getExpression());
+
+ RyaTripleContext instance;
+
+ private RyaStatement stmt;
+ private Collection<Map.Entry<Key, Value>> spo = new ArrayList<Map.Entry<Key, Value>>();
+ private Collection<Map.Entry<Key, Value>> po = new ArrayList<Map.Entry<Key, Value>>();
+ private Collection<Map.Entry<Key, Value>> osp = new ArrayList<Map.Entry<Key, Value>>();
+
+ public RyaTableKeyValues(RyaStatement stmt, RdfCloudTripleStoreConfiguration conf) {
+ this.stmt = stmt;
+ this.instance = RyaTripleContext.getInstance(conf);
+ }
+
+ public Collection<Map.Entry<Key, Value>> getSpo() {
+ return spo;
+ }
+
+ public Collection<Map.Entry<Key, Value>> getPo() {
+ return po;
+ }
+
+ public Collection<Map.Entry<Key, Value>> getOsp() {
+ return osp;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public RyaTableKeyValues invoke() throws IOException {
+ /**
+ * TODO: If there are contexts, do we still replicate the information into the default graph as well
+ * as the named graphs?
+ */try {
+ Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt);
+ TripleRow tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+ byte[] columnVisibility = tripleRow.getColumnVisibility();
+ Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new Text(columnVisibility);
+ Long timestamp = tripleRow.getTimestamp();
+ timestamp = timestamp == null ? 0l : timestamp;
+ byte[] value = tripleRow.getValue();
+ Value v = value == null ? EMPTY_VALUE : new Value(value);
+ spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+ new Text(tripleRow.getColumnFamily()),
+ new Text(tripleRow.getColumnQualifier()),
+ cv, timestamp), v));
+ tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
+ po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+ new Text(tripleRow.getColumnFamily()),
+ new Text(tripleRow.getColumnQualifier()),
+ cv, timestamp), v));
+ tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
+ osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()),
+ new Text(tripleRow.getColumnFamily()),
+ new Text(tripleRow.getColumnQualifier()),
+ cv, timestamp), v));
+ } catch (TripleRowResolverException e) {
+ throw new IOException(e);
+ }
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "RyaTableKeyValues{" +
+ "statement=" + stmt +
+ ", spo=" + spo +
+ ", po=" + po +
+ ", o=" + osp +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java
new file mode 100644
index 0000000..2a4871d
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/RyaTableMutationsFactory.java
@@ -0,0 +1,148 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+
+public class RyaTableMutationsFactory {
+
+ RyaTripleContext ryaContext;
+
+ public RyaTableMutationsFactory(RyaTripleContext ryaContext) {
+ this.ryaContext = ryaContext;
+ }
+
+ //TODO: Does this still need to be collections
+ public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serialize(
+ RyaStatement stmt) throws IOException {
+
+ Collection<Mutation> spo_muts = new ArrayList<Mutation>();
+ Collection<Mutation> po_muts = new ArrayList<Mutation>();
+ Collection<Mutation> osp_muts = new ArrayList<Mutation>();
+ /**
+ * TODO: If there are contexts, do we still replicate the information into the default graph as well
+ * as the named graphs?
+ */
+ try {
+ Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt);
+ TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO);
+ spo_muts.add(createMutation(tripleRow));
+ tripleRow = rowMap.get(TABLE_LAYOUT.PO);
+ po_muts.add(createMutation(tripleRow));
+ tripleRow = rowMap.get(TABLE_LAYOUT.OSP);
+ osp_muts.add(createMutation(tripleRow));
+ } catch (TripleRowResolverException fe) {
+ throw new IOException(fe);
+ }
+
+ Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations =
+ new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>();
+ mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts);
+ mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts);
+ mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts);
+
+ return mutations;
+ }
+
+ public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serializeDelete(
+ RyaStatement stmt) throws IOException {
+
+ Collection<Mutation> spo_muts = new ArrayList<Mutation>();
+ Collection<Mutation> po_muts = new ArrayList<Mutation>();
+ Collection<Mutation> osp_muts = new ArrayList<Mutation>();
+ /**
+ * TODO: If there are contexts, do we still replicate the information into the default graph as well
+ * as the named graphs?
+ */
+ try {
+ Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt);
+ TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO);
+ spo_muts.add(deleteMutation(tripleRow));
+ tripleRow = rowMap.get(TABLE_LAYOUT.PO);
+ po_muts.add(deleteMutation(tripleRow));
+ tripleRow = rowMap.get(TABLE_LAYOUT.OSP);
+ osp_muts.add(deleteMutation(tripleRow));
+ } catch (TripleRowResolverException fe) {
+ throw new IOException(fe);
+ }
+
+ Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations =
+ new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>();
+ mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts);
+ mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts);
+ mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts);
+
+ return mutations;
+
+ }
+
+ protected Mutation deleteMutation(TripleRow tripleRow) {
+ Mutation m = new Mutation(new Text(tripleRow.getRow()));
+
+ byte[] columnFamily = tripleRow.getColumnFamily();
+ Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
+
+ byte[] columnQualifier = tripleRow.getColumnQualifier();
+ Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier);
+
+ m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()),
+ tripleRow.getTimestamp());
+ return m;
+ }
+
+ protected Mutation createMutation(TripleRow tripleRow) {
+ Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
+ byte[] columnVisibility = tripleRow.getColumnVisibility();
+ ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility);
+ Long timestamp = tripleRow.getTimestamp();
+ byte[] value = tripleRow.getValue();
+ Value v = value == null ? EMPTY_VALUE : new Value(value);
+ byte[] columnQualifier = tripleRow.getColumnQualifier();
+ Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier);
+ byte[] columnFamily = tripleRow.getColumnFamily();
+ Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
+
+ mutation.put(cfText, cqText, cv, timestamp, v);
+ return mutation;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java
new file mode 100644
index 0000000..5df5da9
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AbstractAccumuloIndexer.java
@@ -0,0 +1,59 @@
+package mvm.rya.accumulo.experimental;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+import java.util.Collection;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+
+public abstract class AbstractAccumuloIndexer implements AccumuloIndexer {
+
+ @Override
+ public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException {
+ }
+
+ @Override
+ public void storeStatements(Collection<RyaStatement> statements) throws IOException {
+ for (RyaStatement s : statements) {
+ storeStatement(s);
+ }
+ }
+
+ @Override
+ public void deleteStatement(RyaStatement stmt) throws IOException {
+ }
+
+ @Override
+ public void dropGraph(RyaURI... graphs) {
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java
new file mode 100644
index 0000000..5581e08
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/experimental/AccumuloIndexer.java
@@ -0,0 +1,38 @@
+package mvm.rya.accumulo.experimental;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+
+public interface AccumuloIndexer extends RyaSecondaryIndexer {
+ public void init();
+ public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException;
+ public void setConnector(Connector connector);
+ public void destroy();
+ public void purge(RdfCloudTripleStoreConfiguration configuration);
+ public void dropAndDestroy();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
new file mode 100644
index 0000000..6e818b3
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
@@ -0,0 +1,229 @@
+package mvm.rya.accumulo.instance;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map.Entry;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Result;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetailsRepository;
+
+/**
+ * An implementation of {@link RyaDetailsRepository} that stores a Rya
+ * instance's {@link RyaDetails} in an Accumulo table.
+ * </p>
+ * XXX
+ * This implementation writes the details object as a serialized byte array to
+ * a row in Accumulo. Storing the entire structure within a single value is
+ * attractive because Accumulo's conditional writer will let us do checkAndSet
+ * style operations to synchronize writes to the object. On the downside, only
+ * Java clients will work.
+ */
+@ParametersAreNonnullByDefault
+public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepository {
+
+ public static final String INSTANCE_DETAILS_TABLE_NAME = "instance_details";
+
+ private static final Text ROW_ID = new Text("instance metadata");
+ private static final Text COL_FAMILY = new Text("instance");
+ private static final Text COL_QUALIFIER = new Text("details");
+
+ private final RyaDetailsSerializer serializer = new RyaDetailsSerializer();
+
+ private final Connector connector;
+ private final String instanceName;
+ private final String detailsTableName;
+
+
+ /**
+ * Constructs an instance of {@link AccumuloRyaInstanceDetailsRepository}.
+ *
+ * @param connector - Connects to the instance of Accumulo that hosts the Rya instance. (not null)
+ * @param instanceName - The name of the Rya instance this repository represents. (not null)
+ */
+ public AccumuloRyaInstanceDetailsRepository(final Connector connector, final String instanceName) {
+ this.connector = requireNonNull( connector );
+ this.instanceName = requireNonNull( instanceName );
+ this.detailsTableName = instanceName + INSTANCE_DETAILS_TABLE_NAME;
+ }
+
+ @Override
+ public boolean isInitialized() throws RyaDetailsRepositoryException {
+ try {
+ final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations());
+ scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
+ return scanner.iterator().hasNext();
+ } catch (final TableNotFoundException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void initialize(final RyaDetails details) throws AlreadyInitializedException, RyaDetailsRepositoryException {
+ // Preconditions.
+ requireNonNull( details );
+
+ if(!details.getRyaInstanceName().equals( instanceName )) {
+ throw new RyaDetailsRepositoryException("The instance name that was in the provided 'details' does not match " +
+ "the instance name that this repository is connected to. Make sure you're connected to the" +
+ "correct Rya instance.");
+ }
+
+ if(isInitialized()) {
+ throw new AlreadyInitializedException("The repository has already been initialized for the Rya instance named '" +
+ instanceName + "'.");
+ }
+
+ // Create the table that hosts the details if it has not been created yet.
+ final TableOperations tableOps = connector.tableOperations();
+ if(!tableOps.exists(detailsTableName)) {
+ try {
+ tableOps.create(detailsTableName);
+ } catch (AccumuloException | AccumuloSecurityException | TableExistsException e) {
+ throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" +
+ instanceName + "' because the the table that holds that information could not be created.");
+ }
+ }
+
+ // Write the details to the table.
+ BatchWriter writer = null;
+ try {
+ writer = connector.createBatchWriter(detailsTableName, new BatchWriterConfig());
+
+ final byte[] bytes = serializer.serialize(details);
+ final Mutation mutation = new Mutation(ROW_ID);
+ mutation.put(COL_FAMILY, COL_QUALIFIER, new Value(bytes));
+ writer.addMutation( mutation );
+
+ } catch (final TableNotFoundException | MutationsRejectedException e) {
+ throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + instanceName + "'.", e);
+ } finally {
+ if(writer != null) {
+ try {
+ writer.close();
+ } catch (final MutationsRejectedException e) {
+ throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + instanceName + "'.", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public RyaDetails getRyaInstanceDetails() throws NotInitializedException, RyaDetailsRepositoryException {
+ // Preconditions.
+ if(!isInitialized()) {
+ throw new NotInitializedException("Could not fetch the details for the Rya instanced named '" +
+ instanceName + "' because it has not been initialized yet.");
+ }
+
+ // Read it from the table.
+ try {
+ // Fetch the value from the table.
+ final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations());
+ scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
+ final Entry<Key, Value> entry = scanner.iterator().next();
+
+ // Deserialize it.
+ final byte[] bytes = entry.getValue().get();
+ return serializer.deserialize( bytes );
+
+ } catch (final TableNotFoundException e) {
+ throw new RyaDetailsRepositoryException("Could not get the details from the table.", e);
+ }
+ }
+
+ @Override
+ public void update(final RyaDetails oldDetails, final RyaDetails newDetails)
+ throws NotInitializedException, ConcurrentUpdateException, RyaDetailsRepositoryException {
+ // Preconditions.
+ requireNonNull(oldDetails);
+ requireNonNull(newDetails);
+
+ if(!newDetails.getRyaInstanceName().equals( instanceName )) {
+ throw new RyaDetailsRepositoryException("The instance name that was in the provided 'newDetails' does not match " +
+ "the instance name that this repository is connected to. Make sure you're connected to the" +
+ "correct Rya instance.");
+ }
+
+ if(!isInitialized()) {
+ throw new NotInitializedException("Could not update the details for the Rya instanced named '" +
+ instanceName + "' because it has not been initialized yet.");
+ }
+
+ // Use a conditional writer so that we can detect when the old details
+ // are no longer the currently stored ones.
+ ConditionalWriter writer = null;
+ try {
+ // Setup the condition that ensures the details have not changed since the edits were made.
+ final byte[] oldDetailsBytes = serializer.serialize(oldDetails);
+ final Condition condition = new Condition(COL_FAMILY, COL_QUALIFIER);
+ condition.setValue( oldDetailsBytes );
+
+ // Create the mutation that only performs the update if the details haven't changed.
+ final ConditionalMutation mutation = new ConditionalMutation(ROW_ID);
+ mutation.addCondition( condition );
+ final byte[] newDetailsBytes = serializer.serialize(newDetails);
+ mutation.put(COL_FAMILY, COL_QUALIFIER, new Value(newDetailsBytes));
+
+ // Do the write.
+ writer = connector.createConditionalWriter(detailsTableName, new ConditionalWriterConfig());
+ final Result result = writer.write(mutation);
+ switch(result.getStatus()) {
+ case REJECTED:
+ case VIOLATED:
+ throw new ConcurrentUpdateException("Could not update the details for the Rya instance named '" +
+ instanceName + "' because the old value is out of date.");
+ case UNKNOWN:
+ case INVISIBLE_VISIBILITY:
+ throw new RyaDetailsRepositoryException("Could not update the details for the Rya instance named '" + instanceName + "'.");
+ }
+ } catch (final TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
+ throw new RyaDetailsRepositoryException("Could not update the details for the Rya instance named '" + instanceName + "'.");
+ } finally {
+ if(writer != null) {
+ writer.close();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java
new file mode 100644
index 0000000..8c863ea
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/RyaDetailsSerializer.java
@@ -0,0 +1,96 @@
+package mvm.rya.accumulo.instance;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+
+/**
+ * Serializes {@link RyaDetails} instances.
+ */
+@ParametersAreNonnullByDefault
+public class RyaDetailsSerializer {
+
+ /**
+ * Serializes an instance of {@link RyaDetails}.
+ *
+ * @param details - The details that will be serialized. (not null)
+ * @return The serialized details.
+ */
+ public byte[] serialize(final RyaDetails details) throws SerializationException {
+ requireNonNull(details);
+
+ try {
+ final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ new ObjectOutputStream(stream).writeObject( details );
+ return stream.toByteArray();
+ } catch (final IOException e) {
+ throw new SerializationException("Could not serialize an instance of RyaDetails.", e);
+ }
+ }
+
+ /**
+ * Deserializes an instance of {@link RyaDetails}.
+ *
+ * @param bytes - The serialized for of a {@link RyaDetails}. (not null)
+ * @return The deserialized object.
+ */
+ public RyaDetails deserialize(final byte[] bytes) throws SerializationException {
+ requireNonNull(bytes);
+
+ try {
+ final ByteArrayInputStream stream = new ByteArrayInputStream( bytes );
+ final Object o = new ObjectInputStream( stream ).readObject();
+
+ if(! (o instanceof RyaDetails) ) {
+ throw new SerializationException("Wrong type of object was deserialized. Class: " + o.getClass().getName() );
+ }
+
+ return (RyaDetails) o;
+ } catch (final ClassNotFoundException | IOException e) {
+ throw new SerializationException("Could not deserialize an instance of RyaDetails.", e);
+ }
+ }
+
+ /**
+ * Could not serialize an instance of {@link RyaDetails}.
+ */
+ public static class SerializationException extends RyaDetailsRepositoryException {
+ private static final long serialVersionUID = 1L;
+
+ public SerializationException(final String message) {
+ super(message);
+ }
+
+ public SerializationException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
\ No newline at end of file