You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2016/10/15 20:06:56 UTC
[36/69] [abbrv] [partial] incubator-rya git commit: RYA-198 Renaming
Files
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java
deleted file mode 100644
index 5581e08..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package mvm.rya.accumulo.experimental;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.io.IOException;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MultiTableBatchWriter;
-
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.persist.index.RyaSecondaryIndexer;
-
-public interface AccumuloIndexer extends RyaSecondaryIndexer {
- public void init();
- public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException;
- public void setConnector(Connector connector);
- public void destroy();
- public void purge(RdfCloudTripleStoreConfiguration configuration);
- public void dropAndDestroy();
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
deleted file mode 100644
index 6e818b3..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java
+++ /dev/null
@@ -1,229 +0,0 @@
-package mvm.rya.accumulo.instance;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.Map.Entry;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ConditionalWriter;
-import org.apache.accumulo.core.client.ConditionalWriter.Result;
-import org.apache.accumulo.core.client.ConditionalWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.data.Condition;
-import org.apache.accumulo.core.data.ConditionalMutation;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.io.Text;
-
-import mvm.rya.api.instance.RyaDetails;
-import mvm.rya.api.instance.RyaDetailsRepository;
-
-/**
- * An implementation of {@link RyaDetailsRepository} that stores a Rya
- * instance's {@link RyaDetails} in an Accumulo table.
- * </p>
- * XXX
- * This implementation writes the details object as a serialized byte array to
- * a row in Accumulo. Storing the entire structure within a single value is
- * attractive because Accumulo's conditional writer will let us do checkAndSet
- * style operations to synchronize writes to the object. On the downside, only
- * Java clients will work.
- */
-@ParametersAreNonnullByDefault
-public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepository {
-
- public static final String INSTANCE_DETAILS_TABLE_NAME = "instance_details";
-
- private static final Text ROW_ID = new Text("instance metadata");
- private static final Text COL_FAMILY = new Text("instance");
- private static final Text COL_QUALIFIER = new Text("details");
-
- private final RyaDetailsSerializer serializer = new RyaDetailsSerializer();
-
- private final Connector connector;
- private final String instanceName;
- private final String detailsTableName;
-
-
- /**
- * Constructs an instance of {@link AccumuloRyaInstanceDetailsRepository}.
- *
- * @param connector - Connects to the instance of Accumulo that hosts the Rya instance. (not null)
- * @param instanceName - The name of the Rya instance this repository represents. (not null)
- */
- public AccumuloRyaInstanceDetailsRepository(final Connector connector, final String instanceName) {
- this.connector = requireNonNull( connector );
- this.instanceName = requireNonNull( instanceName );
- this.detailsTableName = instanceName + INSTANCE_DETAILS_TABLE_NAME;
- }
-
- @Override
- public boolean isInitialized() throws RyaDetailsRepositoryException {
- try {
- final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations());
- scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
- return scanner.iterator().hasNext();
- } catch (final TableNotFoundException e) {
- return false;
- }
- }
-
- @Override
- public void initialize(final RyaDetails details) throws AlreadyInitializedException, RyaDetailsRepositoryException {
- // Preconditions.
- requireNonNull( details );
-
- if(!details.getRyaInstanceName().equals( instanceName )) {
- throw new RyaDetailsRepositoryException("The instance name that was in the provided 'details' does not match " +
- "the instance name that this repository is connected to. Make sure you're connected to the" +
- "correct Rya instance.");
- }
-
- if(isInitialized()) {
- throw new AlreadyInitializedException("The repository has already been initialized for the Rya instance named '" +
- instanceName + "'.");
- }
-
- // Create the table that hosts the details if it has not been created yet.
- final TableOperations tableOps = connector.tableOperations();
- if(!tableOps.exists(detailsTableName)) {
- try {
- tableOps.create(detailsTableName);
- } catch (AccumuloException | AccumuloSecurityException | TableExistsException e) {
- throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" +
- instanceName + "' because the the table that holds that information could not be created.");
- }
- }
-
- // Write the details to the table.
- BatchWriter writer = null;
- try {
- writer = connector.createBatchWriter(detailsTableName, new BatchWriterConfig());
-
- final byte[] bytes = serializer.serialize(details);
- final Mutation mutation = new Mutation(ROW_ID);
- mutation.put(COL_FAMILY, COL_QUALIFIER, new Value(bytes));
- writer.addMutation( mutation );
-
- } catch (final TableNotFoundException | MutationsRejectedException e) {
- throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + instanceName + "'.", e);
- } finally {
- if(writer != null) {
- try {
- writer.close();
- } catch (final MutationsRejectedException e) {
- throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + instanceName + "'.", e);
- }
- }
- }
- }
-
- @Override
- public RyaDetails getRyaInstanceDetails() throws NotInitializedException, RyaDetailsRepositoryException {
- // Preconditions.
- if(!isInitialized()) {
- throw new NotInitializedException("Could not fetch the details for the Rya instanced named '" +
- instanceName + "' because it has not been initialized yet.");
- }
-
- // Read it from the table.
- try {
- // Fetch the value from the table.
- final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations());
- scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER);
- final Entry<Key, Value> entry = scanner.iterator().next();
-
- // Deserialize it.
- final byte[] bytes = entry.getValue().get();
- return serializer.deserialize( bytes );
-
- } catch (final TableNotFoundException e) {
- throw new RyaDetailsRepositoryException("Could not get the details from the table.", e);
- }
- }
-
- @Override
- public void update(final RyaDetails oldDetails, final RyaDetails newDetails)
- throws NotInitializedException, ConcurrentUpdateException, RyaDetailsRepositoryException {
- // Preconditions.
- requireNonNull(oldDetails);
- requireNonNull(newDetails);
-
- if(!newDetails.getRyaInstanceName().equals( instanceName )) {
- throw new RyaDetailsRepositoryException("The instance name that was in the provided 'newDetails' does not match " +
- "the instance name that this repository is connected to. Make sure you're connected to the" +
- "correct Rya instance.");
- }
-
- if(!isInitialized()) {
- throw new NotInitializedException("Could not update the details for the Rya instanced named '" +
- instanceName + "' because it has not been initialized yet.");
- }
-
- // Use a conditional writer so that we can detect when the old details
- // are no longer the currently stored ones.
- ConditionalWriter writer = null;
- try {
- // Setup the condition that ensures the details have not changed since the edits were made.
- final byte[] oldDetailsBytes = serializer.serialize(oldDetails);
- final Condition condition = new Condition(COL_FAMILY, COL_QUALIFIER);
- condition.setValue( oldDetailsBytes );
-
- // Create the mutation that only performs the update if the details haven't changed.
- final ConditionalMutation mutation = new ConditionalMutation(ROW_ID);
- mutation.addCondition( condition );
- final byte[] newDetailsBytes = serializer.serialize(newDetails);
- mutation.put(COL_FAMILY, COL_QUALIFIER, new Value(newDetailsBytes));
-
- // Do the write.
- writer = connector.createConditionalWriter(detailsTableName, new ConditionalWriterConfig());
- final Result result = writer.write(mutation);
- switch(result.getStatus()) {
- case REJECTED:
- case VIOLATED:
- throw new ConcurrentUpdateException("Could not update the details for the Rya instance named '" +
- instanceName + "' because the old value is out of date.");
- case UNKNOWN:
- case INVISIBLE_VISIBILITY:
- throw new RyaDetailsRepositoryException("Could not update the details for the Rya instance named '" + instanceName + "'.");
- }
- } catch (final TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
- throw new RyaDetailsRepositoryException("Could not update the details for the Rya instance named '" + instanceName + "'.");
- } finally {
- if(writer != null) {
- writer.close();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/RyaDetailsSerializer.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/RyaDetailsSerializer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/RyaDetailsSerializer.java
deleted file mode 100644
index 8c863ea..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/RyaDetailsSerializer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package mvm.rya.accumulo.instance;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import mvm.rya.api.instance.RyaDetails;
-import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
-
-/**
- * Serializes {@link RyaDetails} instances.
- */
-@ParametersAreNonnullByDefault
-public class RyaDetailsSerializer {
-
- /**
- * Serializes an instance of {@link RyaDetails}.
- *
- * @param details - The details that will be serialized. (not null)
- * @return The serialized details.
- */
- public byte[] serialize(final RyaDetails details) throws SerializationException {
- requireNonNull(details);
-
- try {
- final ByteArrayOutputStream stream = new ByteArrayOutputStream();
- new ObjectOutputStream(stream).writeObject( details );
- return stream.toByteArray();
- } catch (final IOException e) {
- throw new SerializationException("Could not serialize an instance of RyaDetails.", e);
- }
- }
-
- /**
- * Deserializes an instance of {@link RyaDetails}.
- *
- * @param bytes - The serialized for of a {@link RyaDetails}. (not null)
- * @return The deserialized object.
- */
- public RyaDetails deserialize(final byte[] bytes) throws SerializationException {
- requireNonNull(bytes);
-
- try {
- final ByteArrayInputStream stream = new ByteArrayInputStream( bytes );
- final Object o = new ObjectInputStream( stream ).readObject();
-
- if(! (o instanceof RyaDetails) ) {
- throw new SerializationException("Wrong type of object was deserialized. Class: " + o.getClass().getName() );
- }
-
- return (RyaDetails) o;
- } catch (final ClassNotFoundException | IOException e) {
- throw new SerializationException("Could not deserialize an instance of RyaDetails.", e);
- }
- }
-
- /**
- * Could not serialize an instance of {@link RyaDetails}.
- */
- public static class SerializationException extends RyaDetailsRepositoryException {
- private static final long serialVersionUID = 1L;
-
- public SerializationException(final String message) {
- super(message);
- }
-
- public SerializationException(final String message, final Throwable cause) {
- super(message, cause);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
deleted file mode 100644
index ba3ffd2..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java
+++ /dev/null
@@ -1,410 +0,0 @@
-package mvm.rya.accumulo.query;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import static mvm.rya.api.RdfCloudTripleStoreUtils.layoutToTable;
-import info.aduna.iteration.CloseableIteration;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-import mvm.rya.api.domain.RyaRange;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.layout.TableLayoutStrategy;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.api.persist.query.BatchRyaQuery;
-import mvm.rya.api.persist.query.RyaQuery;
-import mvm.rya.api.persist.query.RyaQueryEngine;
-import mvm.rya.api.query.strategy.ByteRange;
-import mvm.rya.api.query.strategy.TriplePatternStrategy;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRowRegex;
-import mvm.rya.api.utils.CloseableIterableIteration;
-
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.ScannerBase;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.RegExFilter;
-import org.apache.accumulo.core.iterators.user.TimestampFilter;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.io.Text;
-import org.calrissian.mango.collect.CloseableIterable;
-import org.calrissian.mango.collect.CloseableIterables;
-import org.calrissian.mango.collect.FluentCloseableIterable;
-import org.openrdf.query.BindingSet;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterators;
-
-/**
- * Date: 7/17/12
- * Time: 9:28 AM
- */
-public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfiguration> {
-
- private AccumuloRdfConfiguration configuration;
- private Connector connector;
- private RyaTripleContext ryaContext;
- private final Map<TABLE_LAYOUT, KeyValueToRyaStatementFunction> keyValueToRyaStatementFunctionMap = new HashMap<TABLE_LAYOUT, KeyValueToRyaStatementFunction>();
-
- public AccumuloRyaQueryEngine(Connector connector) {
- this(connector, new AccumuloRdfConfiguration());
- }
-
- public AccumuloRyaQueryEngine(Connector connector, AccumuloRdfConfiguration conf) {
- this.connector = connector;
- this.configuration = conf;
- ryaContext = RyaTripleContext.getInstance(conf);
- keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.SPO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.SPO, ryaContext));
- keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.PO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.PO, ryaContext));
- keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.OSP, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.OSP, ryaContext));
- }
-
- @Override
- public CloseableIteration<RyaStatement, RyaDAOException> query(RyaStatement stmt, AccumuloRdfConfiguration conf) throws RyaDAOException {
- if (conf == null) {
- conf = configuration;
- }
-
- RyaQuery ryaQuery = RyaQuery.builder(stmt).load(conf).build();
- CloseableIterable<RyaStatement> results = query(ryaQuery);
-
- return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results);
- }
-
- protected String getData(RyaType ryaType) {
- return (ryaType != null) ? (ryaType.getData()) : (null);
- }
-
- @Override
- public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet(Collection<Map.Entry<RyaStatement, BindingSet>> stmts, AccumuloRdfConfiguration conf) throws RyaDAOException {
- if (conf == null) {
- conf = configuration;
- }
- //query configuration
- Authorizations authorizations = conf.getAuthorizations();
- Long ttl = conf.getTtl();
- Long maxResults = conf.getLimit();
- Integer maxRanges = conf.getMaxRangesForScanner();
- Integer numThreads = conf.getNumThreads();
-
- //TODO: cannot span multiple tables here
- try {
- Collection<Range> ranges = new HashSet<Range>();
- RangeBindingSetEntries rangeMap = new RangeBindingSetEntries();
- TABLE_LAYOUT layout = null;
- RyaURI context = null;
- TriplePatternStrategy strategy = null;
- for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) {
- RyaStatement stmt = stmtbs.getKey();
- context = stmt.getContext(); //TODO: This will be overwritten
- BindingSet bs = stmtbs.getValue();
- strategy = ryaContext.retrieveStrategy(stmt);
- if (strategy == null) {
- throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported");
- }
-
- Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry =
- strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf);
-
- //use range to set scanner
- //populate scanner based on authorizations, ttl
- layout = entry.getKey();
- ByteRange byteRange = entry.getValue();
- Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd()));
- ranges.add(range);
- rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, bs));
- }
- //no ranges
- if (layout == null) return null;
- String regexSubject = conf.getRegexSubject();
- String regexPredicate = conf.getRegexPredicate();
- String regexObject = conf.getRegexObject();
- TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null);
-
- String table = layoutToTable(layout, conf);
- boolean useBatchScanner = ranges.size() > maxRanges;
- RyaStatementBindingSetKeyValueIterator iterator = null;
- if (useBatchScanner) {
- ScannerBase scanner = connector.createBatchScanner(table, authorizations, numThreads);
- ((BatchScanner) scanner).setRanges(ranges);
- fillScanner(scanner, context, null, ttl, null, tripleRowRegex, conf);
- iterator = new RyaStatementBindingSetKeyValueIterator(layout, ryaContext, scanner, rangeMap);
- } else {
- Scanner scannerBase = null;
- Iterator<Map.Entry<Key, Value>>[] iters = new Iterator[ranges.size()];
- int i = 0;
- for (Range range : ranges) {
- scannerBase = connector.createScanner(table, authorizations);
- scannerBase.setRange(range);
- fillScanner(scannerBase, context, null, ttl, null, tripleRowRegex, conf);
- iters[i] = scannerBase.iterator();
- i++;
- }
- iterator = new RyaStatementBindingSetKeyValueIterator(layout, Iterators.concat(iters), rangeMap, ryaContext);
- }
- if (maxResults != null) {
- iterator.setMaxResults(maxResults);
- }
- return iterator;
- } catch (Exception e) {
- throw new RyaDAOException(e);
- }
-
- }
-
- @Override
- public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(Collection<RyaStatement> stmts, AccumuloRdfConfiguration conf)
- throws RyaDAOException {
- if (conf == null) {
- conf = configuration;
- }
-
- BatchRyaQuery batchRyaQuery = BatchRyaQuery.builder(stmts).load(conf).build();
- CloseableIterable<RyaStatement> results = query(batchRyaQuery);
-
- return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results);
- }
-
- @Override
- public CloseableIterable<RyaStatement> query(RyaQuery ryaQuery) throws RyaDAOException {
- Preconditions.checkNotNull(ryaQuery);
- RyaStatement stmt = ryaQuery.getQuery();
- Preconditions.checkNotNull(stmt);
-
- //query configuration
- String[] auths = ryaQuery.getAuths();
- Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations();
- Long ttl = ryaQuery.getTtl();
- Long currentTime = ryaQuery.getCurrentTime();
- Long maxResults = ryaQuery.getMaxResults();
- Integer batchSize = ryaQuery.getBatchSize();
- String regexSubject = ryaQuery.getRegexSubject();
- String regexPredicate = ryaQuery.getRegexPredicate();
- String regexObject = ryaQuery.getRegexObject();
- TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy();
-
- try {
- //find triple pattern range
- TriplePatternStrategy strategy = ryaContext.retrieveStrategy(stmt);
- TABLE_LAYOUT layout;
- Range range;
- RyaURI subject = stmt.getSubject();
- RyaURI predicate = stmt.getPredicate();
- RyaType object = stmt.getObject();
- RyaURI context = stmt.getContext();
- String qualifier = stmt.getQualifer();
- TripleRowRegex tripleRowRegex = null;
- if (strategy != null) {
- //otherwise, full table scan is supported
- Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry =
- strategy.defineRange(subject, predicate, object, context, null);
- layout = entry.getKey();
- ByteRange byteRange = entry.getValue();
- range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd()));
-
- } else {
- range = new Range();
- layout = TABLE_LAYOUT.SPO;
- strategy = ryaContext.retrieveStrategy(layout);
- }
-
- byte[] objectTypeInfo = null;
- if (object != null) {
- //TODO: Not good to serialize this twice
- if (object instanceof RyaRange) {
- objectTypeInfo = RyaContext.getInstance().serializeType(((RyaRange) object).getStart())[1];
- } else {
- objectTypeInfo = RyaContext.getInstance().serializeType(object)[1];
- }
- }
-
- tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, objectTypeInfo);
-
- //use range to set scanner
- //populate scanner based on authorizations, ttl
- String table = layoutToTable(layout, tableLayoutStrategy);
- Scanner scanner = connector.createScanner(table, authorizations);
- scanner.setRange(range);
- if (batchSize != null) {
- scanner.setBatchSize(batchSize);
- }
- fillScanner(scanner, context, qualifier, ttl, currentTime, tripleRowRegex, ryaQuery.getConf());
-
- FluentCloseableIterable<RyaStatement> results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner))
- .transform(keyValueToRyaStatementFunctionMap.get(layout));
- if (maxResults != null) {
- results = results.limit(maxResults.intValue());
- }
-
- return results;
- } catch (Exception e) {
- throw new RyaDAOException(e);
- }
- }
-
- @Override
- public CloseableIterable<RyaStatement> query(BatchRyaQuery ryaQuery) throws RyaDAOException {
- Preconditions.checkNotNull(ryaQuery);
- Iterable<RyaStatement> stmts = ryaQuery.getQueries();
- Preconditions.checkNotNull(stmts);
-
- //query configuration
- String[] auths = ryaQuery.getAuths();
- final Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations();
- final Long ttl = ryaQuery.getTtl();
- Long currentTime = ryaQuery.getCurrentTime();
- Long maxResults = ryaQuery.getMaxResults();
- Integer batchSize = ryaQuery.getBatchSize();
- Integer numQueryThreads = ryaQuery.getNumQueryThreads();
- String regexSubject = ryaQuery.getRegexSubject();
- String regexPredicate = ryaQuery.getRegexPredicate();
- String regexObject = ryaQuery.getRegexObject();
- TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy();
- int maxRanges = ryaQuery.getMaxRanges();
-
- //TODO: cannot span multiple tables here
- try {
- Collection<Range> ranges = new HashSet<Range>();
- TABLE_LAYOUT layout = null;
- RyaURI context = null;
- TriplePatternStrategy strategy = null;
- for (RyaStatement stmt : stmts) {
- context = stmt.getContext(); //TODO: This will be overwritten
- strategy = ryaContext.retrieveStrategy(stmt);
- if (strategy == null) {
- throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported");
- }
-
- Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry =
- strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null);
-
- //use range to set scanner
- //populate scanner based on authorizations, ttl
- layout = entry.getKey();
- ByteRange byteRange = entry.getValue();
- Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd()));
- ranges.add(range);
- }
- //no ranges
- if (layout == null) throw new IllegalArgumentException("No table layout specified");
-
- final TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null);
-
- final String table = layoutToTable(layout, tableLayoutStrategy);
- boolean useBatchScanner = ranges.size() > maxRanges;
- FluentCloseableIterable<RyaStatement> results = null;
- if (useBatchScanner) {
- BatchScanner scanner = connector.createBatchScanner(table, authorizations, numQueryThreads);
- scanner.setRanges(ranges);
- fillScanner(scanner, context, null, ttl, null, tripleRowRegex, ryaQuery.getConf());
- results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner)).transform(keyValueToRyaStatementFunctionMap.get(layout));
- } else {
- final RyaURI fcontext = context;
- final RdfCloudTripleStoreConfiguration fconf = ryaQuery.getConf();
- FluentIterable<RyaStatement> fluent = FluentIterable.from(ranges).transformAndConcat(new Function<Range, Iterable<Map.Entry<Key, Value>>>() {
- @Override
- public Iterable<Map.Entry<Key, Value>> apply(Range range) {
- try {
- Scanner scanner = connector.createScanner(table, authorizations);
- scanner.setRange(range);
- fillScanner(scanner, fcontext, null, ttl, null, tripleRowRegex, fconf);
- return scanner;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }).transform(keyValueToRyaStatementFunctionMap.get(layout));
-
- results = FluentCloseableIterable.from(CloseableIterables.wrap(fluent));
- }
- if (maxResults != null) {
- results = results.limit(maxResults.intValue());
- }
- return results;
- } catch (Exception e) {
- throw new RyaDAOException(e);
- }
- }
-
- protected void fillScanner(ScannerBase scanner, RyaURI context, String qualifier, Long ttl, Long currentTime, TripleRowRegex tripleRowRegex, RdfCloudTripleStoreConfiguration conf) throws IOException {
- if (context != null && qualifier != null) {
- scanner.fetchColumn(new Text(context.getData()), new Text(qualifier));
- } else if (context != null) {
- scanner.fetchColumnFamily(new Text(context.getData()));
- } else if (qualifier != null) {
- IteratorSetting setting = new IteratorSetting(8, "riq", RegExFilter.class.getName());
- RegExFilter.setRegexs(setting, null, null, qualifier, null, false);
- scanner.addScanIterator(setting);
- }
- if (ttl != null) {
- IteratorSetting setting = new IteratorSetting(9, "fi", TimestampFilter.class.getName());
- TimestampFilter.setStart(setting, System.currentTimeMillis() - ttl, true);
- if(currentTime != null){
- TimestampFilter.setStart(setting, currentTime - ttl, true);
- TimestampFilter.setEnd(setting, currentTime, true);
- }
- scanner.addScanIterator(setting);
- }
- if (tripleRowRegex != null) {
- IteratorSetting setting = new IteratorSetting(11, "ri", RegExFilter.class.getName());
- String regex = tripleRowRegex.getRow();
- RegExFilter.setRegexs(setting, regex, null, null, null, false);
- scanner.addScanIterator(setting);
- }
- if (conf instanceof AccumuloRdfConfiguration) {
- //TODO should we take the iterator settings as is or should we adjust the priority based on the above?
- for (IteratorSetting itr : ((AccumuloRdfConfiguration)conf).getAdditionalIterators()) {
- scanner.addScanIterator(itr);
- }
- }
- }
-
- @Override
- public void setConf(AccumuloRdfConfiguration conf) {
- this.configuration = conf;
- }
-
- @Override
- public AccumuloRdfConfiguration getConf() {
- return configuration;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/KeyValueToRyaStatementFunction.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/KeyValueToRyaStatementFunction.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/KeyValueToRyaStatementFunction.java
deleted file mode 100644
index 2813438..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/KeyValueToRyaStatementFunction.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package mvm.rya.accumulo.query;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.util.Map;
-
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-
-import com.google.common.base.Function;
-
-/**
- * Date: 1/30/13
- * Time: 2:09 PM
- */
-public class KeyValueToRyaStatementFunction implements Function<Map.Entry<Key, Value>, RyaStatement> {
-
- private TABLE_LAYOUT tableLayout;
- private RyaTripleContext context;
-
- public KeyValueToRyaStatementFunction(TABLE_LAYOUT tableLayout, RyaTripleContext context) {
- this.tableLayout = tableLayout;
- this.context = context;
- }
-
- @Override
- public RyaStatement apply(Map.Entry<Key, Value> input) {
- Key key = input.getKey();
- Value value = input.getValue();
- RyaStatement statement = null;
- try {
- statement = context.deserializeTriple(tableLayout,
- new TripleRow(key.getRowData().toArray(),
- key.getColumnFamilyData().toArray(),
- key.getColumnQualifierData().toArray(),
- key.getTimestamp(),
- key.getColumnVisibilityData().toArray(),
- (value != null) ? value.get() : null
- ));
- } catch (TripleRowResolverException e) {
- throw new RuntimeException(e);
- }
-
- return statement;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java
deleted file mode 100644
index c59cb87..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package mvm.rya.accumulo.query;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.openrdf.query.BindingSet;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Class RangeBindingSetCollection
- * Date: Feb 23, 2011
- * Time: 10:15:48 AM
- */
-public class RangeBindingSetEntries {
- public Collection<Map.Entry<Range, BindingSet>> ranges;
-
- public RangeBindingSetEntries() {
- this(new ArrayList<Map.Entry<Range, BindingSet>>());
- }
-
- public RangeBindingSetEntries(Collection<Map.Entry<Range, BindingSet>> ranges) {
- this.ranges = ranges;
- }
-
- public Collection<BindingSet> containsKey(Key key) {
- //TODO: need to find a better way to sort these and pull
- //TODO: maybe fork/join here
- Collection<BindingSet> bss = new ArrayList<BindingSet>();
- for (Map.Entry<Range, BindingSet> entry : ranges) {
- if (entry.getKey().contains(key))
- bss.add(entry.getValue());
- }
- return bss;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java
deleted file mode 100644
index b4333bd..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package mvm.rya.accumulo.query;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.ScannerBase;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.openrdf.query.BindingSet;
-
-/**
- * Date: 7/17/12
- * Time: 11:48 AM
- */
-public class RyaStatementBindingSetKeyValueIterator implements CloseableIteration<Map.Entry<RyaStatement, BindingSet>, RyaDAOException> {
- private Iterator<Map.Entry<Key, Value>> dataIterator;
- private TABLE_LAYOUT tableLayout;
- private Long maxResults = -1L;
- private ScannerBase scanner;
- private boolean isBatchScanner;
- private RangeBindingSetEntries rangeMap;
- private Iterator<BindingSet> bsIter;
- private RyaStatement statement;
- private RyaTripleContext ryaContext;
-
- public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, RyaTripleContext context, ScannerBase scannerBase, RangeBindingSetEntries rangeMap) {
- this(tableLayout, ((scannerBase instanceof BatchScanner) ? ((BatchScanner) scannerBase).iterator() : ((Scanner) scannerBase).iterator()), rangeMap, context);
- this.scanner = scannerBase;
- isBatchScanner = scanner instanceof BatchScanner;
- }
-
- public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, Iterator<Map.Entry<Key, Value>> dataIterator, RangeBindingSetEntries rangeMap, RyaTripleContext ryaContext) {
- this.tableLayout = tableLayout;
- this.rangeMap = rangeMap;
- this.dataIterator = dataIterator;
- this.ryaContext = ryaContext;
- }
-
- @Override
- public void close() throws RyaDAOException {
- dataIterator = null;
- if (scanner != null && isBatchScanner) {
- ((BatchScanner) scanner).close();
- }
- }
-
- public boolean isClosed() throws RyaDAOException {
- return dataIterator == null;
- }
-
- @Override
- public boolean hasNext() throws RyaDAOException {
- if (isClosed()) {
- return false;
- }
- if (maxResults != 0) {
- if (bsIter != null && bsIter.hasNext()) {
- return true;
- }
- if (dataIterator.hasNext()) {
- return true;
- } else {
- maxResults = 0l;
- return false;
- }
- }
- return false;
- }
-
- @Override
- public Map.Entry<RyaStatement, BindingSet> next() throws RyaDAOException {
- if (!hasNext() || isClosed()) {
- throw new NoSuchElementException();
- }
-
- try {
- while (true) {
- if (bsIter != null && bsIter.hasNext()) {
- maxResults--;
- return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(statement, bsIter.next());
- }
-
- if (dataIterator.hasNext()) {
- Map.Entry<Key, Value> next = dataIterator.next();
- Key key = next.getKey();
- statement = ryaContext.deserializeTriple(tableLayout,
- new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(),
- key.getTimestamp(), key.getColumnVisibilityData().toArray(), next.getValue().get()));
- if (next.getValue() != null) {
- statement.setValue(next.getValue().get());
- }
- Collection<BindingSet> bindingSets = rangeMap.containsKey(key);
- if (!bindingSets.isEmpty()) {
- bsIter = bindingSets.iterator();
- }
- } else {
- break;
- }
- }
- return null;
- } catch (TripleRowResolverException e) {
- throw new RyaDAOException(e);
- }
- }
-
- @Override
- public void remove() throws RyaDAOException {
- next();
- }
-
- public Long getMaxResults() {
- return maxResults;
- }
-
- public void setMaxResults(Long maxResults) {
- this.maxResults = maxResults;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java
deleted file mode 100644
index f4c3081..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package mvm.rya.accumulo.query;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-
-/**
- * Date: 7/17/12
- * Time: 11:48 AM
- */
-public class RyaStatementKeyValueIterator implements CloseableIteration<RyaStatement, RyaDAOException> {
- private Iterator<Map.Entry<Key, Value>> dataIterator;
- private TABLE_LAYOUT tableLayout;
- private Long maxResults = -1L;
- private RyaTripleContext context;
-
- public RyaStatementKeyValueIterator(TABLE_LAYOUT tableLayout, RyaTripleContext context, Iterator<Map.Entry<Key, Value>> dataIterator) {
- this.tableLayout = tableLayout;
- this.dataIterator = dataIterator;
- this.context = context;
- }
-
- @Override
- public void close() throws RyaDAOException {
- dataIterator = null;
- }
-
- public boolean isClosed() throws RyaDAOException {
- return dataIterator == null;
- }
-
- @Override
- public boolean hasNext() throws RyaDAOException {
- if (isClosed()) {
- throw new RyaDAOException("Closed Iterator");
- }
- return maxResults != 0 && dataIterator.hasNext();
- }
-
- @Override
- public RyaStatement next() throws RyaDAOException {
- if (!hasNext()) {
- return null;
- }
-
- try {
- Map.Entry<Key, Value> next = dataIterator.next();
- Key key = next.getKey();
- RyaStatement statement = context.deserializeTriple(tableLayout,
- new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(),
- key.getTimestamp(), key.getColumnVisibilityData().toArray(), next.getValue().get()));
- if (next.getValue() != null) {
- statement.setValue(next.getValue().get());
- }
- maxResults--;
- return statement;
- } catch (TripleRowResolverException e) {
- throw new RyaDAOException(e);
- }
- }
-
- @Override
- public void remove() throws RyaDAOException {
- next();
- }
-
- public Long getMaxResults() {
- return maxResults;
- }
-
- public void setMaxResults(Long maxResults) {
- this.maxResults = maxResults;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java
deleted file mode 100644
index d2dcef9..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package mvm.rya.accumulo.query;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import com.google.common.base.Preconditions;
-import org.apache.accumulo.core.client.ScannerBase;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.calrissian.mango.collect.AbstractCloseableIterable;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * Date: 1/30/13
- * Time: 2:15 PM
- */
-public class ScannerBaseCloseableIterable extends AbstractCloseableIterable<Map.Entry<Key, Value>> {
-
- protected ScannerBase scanner;
-
- public ScannerBaseCloseableIterable(ScannerBase scanner) {
- Preconditions.checkNotNull(scanner);
- this.scanner = scanner;
- }
-
- @Override
- protected void doClose() throws IOException {
- scanner.close();
- }
-
- @Override
- protected Iterator<Map.Entry<Key, Value>> retrieveIterator() {
- return scanner.iterator();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java
deleted file mode 100644
index 97d2f54..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package mvm.rya.accumulo.utils;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.Filter;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.OptionDescriber;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Set the startTime and timeRange. The filter will only keyValues that
- * are within the range [startTime - timeRange, startTime].
- */
-public class TimeRangeFilter extends Filter {
- private long timeRange;
- private long startTime;
- public static final String TIME_RANGE_PROP = "timeRange";
- public static final String START_TIME_PROP = "startTime";
-
- @Override
- public boolean accept(Key k, Value v) {
- long diff = startTime - k.getTimestamp();
- return !(diff > timeRange || diff < 0);
- }
-
- @Override
- public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
- if (options == null) {
- throw new IllegalArgumentException("options must be set for TimeRangeFilter");
- }
-
- timeRange = -1;
- String timeRange_s = options.get(TIME_RANGE_PROP);
- if (timeRange_s == null)
- throw new IllegalArgumentException("timeRange must be set for TimeRangeFilter");
-
- timeRange = Long.parseLong(timeRange_s);
-
- String time = options.get(START_TIME_PROP);
- if (time != null)
- startTime = Long.parseLong(time);
- else
- startTime = System.currentTimeMillis();
- }
-
- @Override
- public OptionDescriber.IteratorOptions describeOptions() {
- Map<String, String> options = new TreeMap<String, String>();
- options.put(TIME_RANGE_PROP, "time range from the startTime (milliseconds)");
- options.put(START_TIME_PROP, "if set, use the given value as the absolute time in milliseconds as the start time in the time range.");
- return new OptionDescriber.IteratorOptions("timeRangeFilter", "TimeRangeFilter removes entries with timestamps outside of the given time range: " +
- "[startTime - timeRange, startTime]",
- options, null);
- }
-
- @Override
- public boolean validateOptions(Map<String, String> options) {
- Long.parseLong(options.get(TIME_RANGE_PROP));
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java
deleted file mode 100644
index cc4edca..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package mvm.rya.accumulo.utils;
-
-import static java.util.Objects.requireNonNull;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.apache.accumulo.core.security.ColumnVisibility;
-
-import com.google.common.base.Charsets;
-
-/**
- * Simplifies Accumulo visibility expressions.
- */
-@ParametersAreNonnullByDefault
-public class VisibilitySimplifier {
-
- /**
- * Simplifies an Accumulo visibility expression.
- *
- * @param visibility - The expression to simplify. (not null)
- * @return A simplified form of {@code visibility}.
- */
- public String simplify(final String visibility) {
- requireNonNull(visibility);
-
- String last = visibility;
- String simplified = new String(new ColumnVisibility(visibility).flatten(), Charsets.UTF_8);
-
- while(!simplified.equals(last)) {
- last = simplified;
- simplified = new String(new ColumnVisibility(simplified).flatten(), Charsets.UTF_8);
- }
-
- return simplified;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloNamespaceTableIterator.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloNamespaceTableIterator.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloNamespaceTableIterator.java
new file mode 100644
index 0000000..ebca6a2
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloNamespaceTableIterator.java
@@ -0,0 +1,99 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import com.google.common.base.Preconditions;
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.api.persist.RdfDAOException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.openrdf.model.Namespace;
+import org.openrdf.model.impl.NamespaceImpl;
+
+import java.io.IOError;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+public class AccumuloNamespaceTableIterator<T extends Namespace> implements
+ CloseableIteration<Namespace, RdfDAOException> {
+
+ private boolean open = false;
+ private Iterator<Entry<Key, Value>> result;
+
+ public AccumuloNamespaceTableIterator(Iterator<Entry<Key, Value>> result) throws RdfDAOException {
+ Preconditions.checkNotNull(result);
+ open = true;
+ this.result = result;
+ }
+
+ @Override
+ public void close() throws RdfDAOException {
+ try {
+ verifyIsOpen();
+ open = false;
+ } catch (IOError e) {
+ throw new RdfDAOException(e);
+ }
+ }
+
+ public void verifyIsOpen() throws RdfDAOException {
+ if (!open) {
+ throw new RdfDAOException("Iterator not open");
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws RdfDAOException {
+ verifyIsOpen();
+ return result != null && result.hasNext();
+ }
+
+ @Override
+ public Namespace next() throws RdfDAOException {
+ if (hasNext()) {
+ return getNamespace(result);
+ }
+ return null;
+ }
+
+ public static Namespace getNamespace(Iterator<Entry<Key, Value>> rowResults) {
+ for (; rowResults.hasNext(); ) {
+ Entry<Key, Value> next = rowResults.next();
+ Key key = next.getKey();
+ Value val = next.getValue();
+ String cf = key.getColumnFamily().toString();
+ String cq = key.getColumnQualifier().toString();
+ return new NamespaceImpl(key.getRow().toString(), new String(
+ val.get()));
+ }
+ return null;
+ }
+
+ @Override
+ public void remove() throws RdfDAOException {
+ next();
+ }
+
+ public boolean isOpen() {
+ return open;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
new file mode 100644
index 0000000..709ceb9
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
@@ -0,0 +1,158 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import mvm.rya.accumulo.experimental.AccumuloIndexer;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by IntelliJ IDEA.
+ * Date: 4/25/12
+ * Time: 3:24 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration {
+
+ public static final String MAXRANGES_SCANNER = "ac.query.maxranges";
+
+ public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers";
+
+ public static final String CONF_FLUSH_EACH_UPDATE = "ac.dao.flush";
+
+ public static final String ITERATOR_SETTINGS_SIZE = "ac.iterators.size";
+ public static final String ITERATOR_SETTINGS_BASE = "ac.iterators.%d.";
+ public static final String ITERATOR_SETTINGS_NAME = ITERATOR_SETTINGS_BASE + "name";
+ public static final String ITERATOR_SETTINGS_CLASS = ITERATOR_SETTINGS_BASE + "iteratorClass";
+ public static final String ITERATOR_SETTINGS_PRIORITY = ITERATOR_SETTINGS_BASE + "priority";
+ public static final String ITERATOR_SETTINGS_OPTIONS_SIZE = ITERATOR_SETTINGS_BASE + "optionsSize";
+ public static final String ITERATOR_SETTINGS_OPTIONS_KEY = ITERATOR_SETTINGS_BASE + "option.%d.name";
+ public static final String ITERATOR_SETTINGS_OPTIONS_VALUE = ITERATOR_SETTINGS_BASE + "option.%d.value";
+
+ public AccumuloRdfConfiguration() {
+ super();
+ }
+
+ public AccumuloRdfConfiguration(Configuration other) {
+ super(other);
+ }
+
+ @Override
+ public AccumuloRdfConfiguration clone() {
+ return new AccumuloRdfConfiguration(this);
+ }
+
+ public Authorizations getAuthorizations() {
+ String[] auths = getAuths();
+ if (auths == null || auths.length == 0)
+ return AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+ return new Authorizations(auths);
+ }
+
+ public void setMaxRangesForScanner(Integer max) {
+ setInt(MAXRANGES_SCANNER, max);
+ }
+
+ public Integer getMaxRangesForScanner() {
+ return getInt(MAXRANGES_SCANNER, 2);
+ }
+
+ public void setAdditionalIndexers(Class<? extends AccumuloIndexer>... indexers) {
+ List<String> strs = Lists.newArrayList();
+ for (Class<? extends AccumuloIndexer> ai : indexers){
+ strs.add(ai.getName());
+ }
+
+ setStrings(CONF_ADDITIONAL_INDEXERS, strs.toArray(new String[]{}));
+ }
+
+ public List<AccumuloIndexer> getAdditionalIndexers() {
+ return getInstances(CONF_ADDITIONAL_INDEXERS, AccumuloIndexer.class);
+ }
+ public boolean flushEachUpdate(){
+ return getBoolean(CONF_FLUSH_EACH_UPDATE, true);
+ }
+
+ public void setFlush(boolean flush){
+ setBoolean(CONF_FLUSH_EACH_UPDATE, flush);
+ }
+
+ public void setAdditionalIterators(IteratorSetting... additionalIterators){
+ //TODO do we need to worry about cleaning up
+ this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length));
+ int i = 0;
+ for(IteratorSetting iterator : additionalIterators) {
+ this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName());
+ this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass());
+ this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority()));
+ Map<String, String> options = iterator.getOptions();
+
+ this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size()));
+ Iterator<Entry<String, String>> it = options.entrySet().iterator();
+ int j = 0;
+ while(it.hasNext()) {
+ Entry<String, String> item = it.next();
+ this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey());
+ this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue());
+ j++;
+ }
+ i++;
+ }
+ }
+
+ public IteratorSetting[] getAdditionalIterators(){
+ int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0"));
+ if(size == 0) {
+ return new IteratorSetting[0];
+ }
+
+ IteratorSetting[] settings = new IteratorSetting[size];
+ for(int i = 0; i < size; i++) {
+ String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i));
+ String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i));
+ int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i)));
+
+ int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i)));
+ Map<String, String> options = new HashMap<String, String>(optionsSize);
+ for(int j = 0; j < optionsSize; j++) {
+ String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j));
+ String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j));
+ options.put(key, value);
+ }
+ settings[i] = new IteratorSetting(priority, name, iteratorClass, options);
+ }
+
+ return settings;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConstants.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConstants.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConstants.java
new file mode 100644
index 0000000..1ec57a7
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConstants.java
@@ -0,0 +1,40 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+
+/**
+ * Interface AccumuloRdfConstants
+ * Date: Mar 1, 2012
+ * Time: 7:24:52 PM
+ */
+public interface AccumuloRdfConstants {
+ public static final Authorizations ALL_AUTHORIZATIONS = Constants.NO_AUTHS;
+
+ public static final Value EMPTY_VALUE = new Value(new byte[0]);
+
+ public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(new byte[0]);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfEvalStatsDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfEvalStatsDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfEvalStatsDAO.java
new file mode 100644
index 0000000..a3e0677
--- /dev/null
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfEvalStatsDAO.java
@@ -0,0 +1,173 @@
+package mvm.rya.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.PRED_CF_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECT_CF_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTPRED_CF_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.PREDOBJECT_CF_TXT;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTOBJECT_CF_TXT;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import mvm.rya.api.RdfCloudTripleStoreStatement;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RdfDAOException;
+import mvm.rya.api.persist.RdfEvalStatsDAO;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Value;
+
+/**
+ * Class CloudbaseRdfEvalStatsDAO
+ * Date: Feb 28, 2012
+ * Time: 5:03:16 PM
+ */
+public class AccumuloRdfEvalStatsDAO implements RdfEvalStatsDAO<AccumuloRdfConfiguration> {
+
+ private boolean initialized = false;
+ private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+
+ private Collection<RdfCloudTripleStoreStatement> statements = new ArrayList<RdfCloudTripleStoreStatement>();
+ private Connector connector;
+
+ // private String evalTable = TBL_EVAL;
+ private TableLayoutStrategy tableLayoutStrategy;
+
+ @Override
+ public void init() throws RdfDAOException {
+ try {
+ if (isInitialized()) {
+ throw new IllegalStateException("Already initialized");
+ }
+ checkNotNull(connector);
+ tableLayoutStrategy = conf.getTableLayoutStrategy();
+// evalTable = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable);
+// conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable);
+
+ TableOperations tos = connector.tableOperations();
+ AccumuloRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getEval());
+// boolean tableExists = tos.exists(evalTable);
+// if (!tableExists)
+// tos.create(evalTable);
+ initialized = true;
+ } catch (Exception e) {
+ throw new RdfDAOException(e);
+ }
+ }
+
+
+ @Override
+ public void destroy() throws RdfDAOException {
+ if (!isInitialized()) {
+ throw new IllegalStateException("Not initialized");
+ }
+ initialized = false;
+ }
+
+ @Override
+ public boolean isInitialized() throws RdfDAOException {
+ return initialized;
+ }
+
+ public Connector getConnector() {
+ return connector;
+ }
+
+ public void setConnector(Connector connector) {
+ this.connector = connector;
+ }
+
+ public AccumuloRdfConfiguration getConf() {
+ return conf;
+ }
+
+ public void setConf(AccumuloRdfConfiguration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public double getCardinality(AccumuloRdfConfiguration conf,
+ mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card,
+ List<Value> val, Resource context) throws RdfDAOException {
+ try {
+ Authorizations authorizations = conf.getAuthorizations();
+ Scanner scanner = connector.createScanner(tableLayoutStrategy.getEval(), authorizations);
+ Text cfTxt = null;
+ if (CARDINALITY_OF.SUBJECT.equals(card)) {
+ cfTxt = SUBJECT_CF_TXT;
+ } else if (CARDINALITY_OF.PREDICATE.equals(card)) {
+ cfTxt = PRED_CF_TXT;
+ } else if (CARDINALITY_OF.OBJECT.equals(card)) {
+// cfTxt = OBJ_CF_TXT; //TODO: How do we do object cardinality
+ return Double.MAX_VALUE;
+ } else if (CARDINALITY_OF.SUBJECTOBJECT.equals(card)) {
+ cfTxt = SUBJECTOBJECT_CF_TXT;
+ } else if (CARDINALITY_OF.SUBJECTPREDICATE.equals(card)) {
+ cfTxt = SUBJECTPRED_CF_TXT;
+ } else if (CARDINALITY_OF.PREDICATEOBJECT.equals(card)) {
+ cfTxt = PREDOBJECT_CF_TXT;
+ } else throw new IllegalArgumentException("Not right Cardinality[" + card + "]");
+ Text cq = EMPTY_TEXT;
+ if (context != null) {
+ cq = new Text(context.stringValue().getBytes());
+ }
+ scanner.fetchColumn(cfTxt, cq);
+ Iterator<Value> vals = val.iterator();
+ String compositeIndex = vals.next().stringValue();
+ while (vals.hasNext()){
+ compositeIndex += DELIM + vals.next().stringValue();
+ }
+ scanner.setRange(new Range(new Text(compositeIndex.getBytes())));
+ Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> iter = scanner.iterator();
+ if (iter.hasNext()) {
+ return Double.parseDouble(new String(iter.next().getValue().get()));
+ }
+ } catch (Exception e) {
+ throw new RdfDAOException(e);
+ }
+
+ //default
+ return -1;
+ }
+
+ @Override
+ public double getCardinality(AccumuloRdfConfiguration conf,
+ mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card,
+ List<Value> val) throws RdfDAOException {
+ return getCardinality(conf, card, val, null);
+ }
+}