You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/07/21 12:50:27 UTC
[2/6] incubator-rya git commit: Consolidated MapReduce API and
applications into toplevel project.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
new file mode 100644
index 0000000..cdc3235
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
@@ -0,0 +1,256 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.builder.CompareToBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+/**
+ * Basic {@link WritableComparable} for using Rya data with Hadoop.
+ * RyaStatementWritable wraps a {@link RyaStatement}, which in turn represents a
+ * statement as a collection of {@link mvm.rya.api.domain.RyaURI} and
+ * {@link mvm.rya.api.domain.RyaType} objects.
+ * <p>
+ * This class is mutable, like all {@link org.apache.hadoop.io.Writable}s. When
+ * used as Mapper or Reducer input, the Hadoop framework will typically reuse
+ * the same object to load the next record. However, loading the next record
+ * will create a new RyaStatement internally. Therefore, if a statement must be
+ * stored for any length of time, be sure to extract the internal RyaStatement.
+ */
+public class RyaStatementWritable implements WritableComparable<RyaStatementWritable> {
+ private RyaTripleContext ryaContext;
+ private RyaStatement ryaStatement;
+
+ /**
+ * Instantiates a RyaStatementWritable with the default RyaTripleContext.
+ * @param conf Unused.
+ */
+ public RyaStatementWritable(Configuration conf) {
+ this();
+ }
+ /**
+ * Instantiates a RyaStatementWritable with a given context.
+ * @param ryaContext Context used for reading and writing the statement.
+ */
+ public RyaStatementWritable(RyaTripleContext ryaContext) {
+ this.ryaContext = ryaContext;
+ }
+ /**
+ * Instantiates a RyaStatementWritable with the default RyaTripleContext.
+ */
+ public RyaStatementWritable() {
+ this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
+ }
+ /**
+ * Instantiates a RyaStatementWritable with a given statement and context.
+ * @param ryaStatement The statement (triple) represented by this object.
+ * @param ryaContext Context used for reading and writing the statement.
+ */
+ public RyaStatementWritable(RyaStatement ryaStatement, RyaTripleContext ryaContext) {
+ this(ryaContext);
+ this.ryaStatement = ryaStatement;
+ }
+
+ /**
+ * Gets the contained RyaStatement.
+ * @return The statement represented by this RyaStatementWritable.
+ */
+ public RyaStatement getRyaStatement() {
+ return ryaStatement;
+ }
+ /**
+ * Sets the contained RyaStatement.
+ * @param ryaStatement The statement to be represented by this
+ * RyaStatementWritable.
+ */
+ public void setRyaStatement(RyaStatement ryaStatement) {
+ this.ryaStatement = ryaStatement;
+ }
+
+ /**
+ * Comparison method for natural ordering. Compares based on the logical
+ * triple (the s/p/o/context information in the underlying RyaStatement)
+ * and then by the metadata contained in the RyaStatement if the triples are
+ * the same.
+ * @return Zero if both RyaStatementWritables contain equivalent statements
+ * or both have null statements; otherwise, an integer whose sign
+ * corresponds to a consistent ordering.
+ */
+ @Override
+ public int compareTo(RyaStatementWritable other) {
+ CompareToBuilder builder = new CompareToBuilder();
+ RyaStatement rsThis = this.getRyaStatement();
+ RyaStatement rsOther = other.getRyaStatement(); // should throw NPE if other is null, as per Comparable contract
+ builder.append(rsThis == null, rsOther == null);
+ if (rsThis != null && rsOther != null) {
+ builder.append(rsThis.getSubject(), rsOther.getSubject());
+ builder.append(rsThis.getPredicate(), rsOther.getPredicate());
+ builder.append(rsThis.getObject(), rsOther.getObject());
+ builder.append(rsThis.getContext(), rsOther.getContext());
+ builder.append(rsThis.getQualifer(), rsOther.getQualifer());
+ builder.append(rsThis.getColumnVisibility(), rsOther.getColumnVisibility());
+ builder.append(rsThis.getValue(), rsOther.getValue());
+ builder.append(rsThis.getTimestamp(), rsOther.getTimestamp());
+ }
+ return builder.toComparison();
+ }
+
+ /**
+ * Returns a hash based on the hashCode method in RyaStatement.
+ * @return A hash that should be consistent for equivalent RyaStatements.
+ */
+ @Override
+ public int hashCode() {
+ if (ryaStatement == null) {
+ return 0;
+ }
+ return ryaStatement.hashCode();
+ }
+
+ /**
+ * Tests for equality using the equals method in RyaStatement.
+ * @param o Object to compare with
+ * @return true if both objects are RyaStatementWritables containing
+ * equivalent RyaStatements.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o == null || !(o instanceof RyaStatementWritable)) {
+ return false;
+ }
+ RyaStatement rsThis = this.getRyaStatement();
+ RyaStatement rsOther = ((RyaStatementWritable) o).getRyaStatement();
+ if (rsThis == null) {
+ return rsOther == null;
+ }
+ else {
+ return rsThis.equals(rsOther);
+ }
+ }
+
+ /**
+ * Serializes this RyaStatementWritable.
+ * @param dataOutput An output stream for serialized statement data.
+ * @throws IOException if the RyaStatement is null or otherwise can't be
+ * serialized.
+ */
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ if (ryaStatement == null) {
+ throw new IOException("Rya Statement is null");
+ }
+ try {
+ Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(ryaStatement);
+ TripleRow tripleRow = map.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
+ byte[] row = tripleRow.getRow();
+ byte[] columnFamily = tripleRow.getColumnFamily();
+ byte[] columnQualifier = tripleRow.getColumnQualifier();
+ write(dataOutput, row);
+ write(dataOutput, columnFamily);
+ write(dataOutput, columnQualifier);
+ write(dataOutput, ryaStatement.getColumnVisibility());
+ write(dataOutput, ryaStatement.getValue());
+ Long timestamp = ryaStatement.getTimestamp();
+ boolean b = timestamp != null;
+ dataOutput.writeBoolean(b);
+ if (b) {
+ dataOutput.writeLong(timestamp);
+ }
+ } catch (TripleRowResolverException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Write part of a statement to an output stream.
+ * @param dataOutput Stream for writing serialized statements.
+ * @param row Individual field to write, as a byte array.
+ * @throws IOException if writing to the stream fails.
+ */
+ protected void write(DataOutput dataOutput, byte[] row) throws IOException {
+ boolean b = row != null;
+ dataOutput.writeBoolean(b);
+ if (b) {
+ dataOutput.writeInt(row.length);
+ dataOutput.write(row);
+ }
+ }
+
+ /**
+ * Read part of a statement from an input stream.
+ * @param dataInput Stream for reading serialized statements.
+ * @return The next individual field, as a byte array.
+ * @throws IOException if reading from the stream fails.
+ */
+ protected byte[] read(DataInput dataInput) throws IOException {
+ if (dataInput.readBoolean()) {
+ int len = dataInput.readInt();
+ byte[] bytes = new byte[len];
+ dataInput.readFully(bytes);
+ return bytes;
+ }else {
+ return null;
+ }
+ }
+
+ /**
+ * Loads a RyaStatementWritable by reading data from an input stream.
+ * Creates a new RyaStatement and assigns it to this RyaStatementWritable.
+ * @param dataInput An stream containing serialized statement data.
+ */
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ byte[] row = read(dataInput);
+ byte[] columnFamily = read(dataInput);
+ byte[] columnQualifier = read(dataInput);
+ byte[] columnVisibility = read(dataInput);
+ byte[] value = read(dataInput);
+ boolean b = dataInput.readBoolean();
+ Long timestamp = null;
+ if (b) {
+ timestamp = dataInput.readLong();
+ }
+ try {
+ ryaStatement = ryaContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO,
+ new TripleRow(row, columnFamily, columnQualifier));
+ ryaStatement.setColumnVisibility(columnVisibility);
+ ryaStatement.setValue(value);
+ ryaStatement.setTimestamp(timestamp);
+ } catch (TripleRowResolverException e) {
+ throw new IOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java
new file mode 100644
index 0000000..bc3af58
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/examples/TextOutputExample.java
@@ -0,0 +1,196 @@
+package mvm.rya.accumulo.mr.examples;
+
+import java.io.BufferedReader;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.util.Date;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.RDFWriter;
+import org.openrdf.rio.Rio;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
+import mvm.rya.accumulo.mr.MRUtils;
+import mvm.rya.accumulo.mr.RyaStatementWritable;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+
+/**
+ * Example of using a MapReduce tool to get triples from a Rya instance and serialize them to a text file as RDF.
+ */
+public class TextOutputExample extends AbstractAccumuloMRTool {
+ private static Logger logger = Logger.getLogger(TextOutputExample.class);
+ private static RDFFormat rdfFormat = RDFFormat.NQUADS;
+ private static String tempDir;
+
+ // Connection information
+ private static final String USERNAME = "root";
+ private static final String PASSWORD = "";
+ private static final String INSTANCE_NAME = "instanceName";
+ private static final String PREFIX = "rya_example_";
+
+ public static void main(String[] args) throws Exception {
+ setUpRya();
+ TextOutputExample tool = new TextOutputExample();
+ ToolRunner.run(new Configuration(), tool, args);
+ }
+
+ static void setUpRya() throws AccumuloException, AccumuloSecurityException, RyaDAOException {
+ MockInstance mock = new MockInstance(INSTANCE_NAME);
+ Connector conn = mock.getConnector(USERNAME, new PasswordToken(PASSWORD));
+ AccumuloRyaDAO dao = new AccumuloRyaDAO();
+ dao.setConnector(conn);
+ AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix(PREFIX);
+ dao.setConf(conf);
+ dao.init();
+ String ns = "http://example.com/";
+ dao.add(new RyaStatement(new RyaURI(ns+"s1"), new RyaURI(ns+"p1"), new RyaURI(ns+"o1")));
+ dao.add(new RyaStatement(new RyaURI(ns+"s1"), new RyaURI(ns+"p2"), new RyaURI(ns+"o2")));
+ dao.add(new RyaStatement(new RyaURI(ns+"s2"), new RyaURI(ns+"p1"), new RyaURI(ns+"o3"),
+ new RyaURI(ns+"g1")));
+ dao.add(new RyaStatement(new RyaURI(ns+"s3"), new RyaURI(ns+"p3"), new RyaURI(ns+"o3"),
+ new RyaURI(ns+"g2")));
+ dao.destroy();
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ logger.info("Configuring tool to connect to mock instance...");
+ MRUtils.setACUserName(conf, USERNAME);
+ MRUtils.setACPwd(conf, PASSWORD);
+ MRUtils.setACInstance(conf, INSTANCE_NAME);
+ MRUtils.setACMock(conf, true);
+ MRUtils.setTablePrefix(conf, PREFIX);
+
+ logger.info("Initializing tool and checking configuration...");
+ init();
+
+ logger.info("Creating Job, setting Mapper class, and setting no Reducer...");
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(TextOutputExample.class);
+ job.setMapperClass(RyaToRdfMapper.class);
+ job.setNumReduceTasks(0);
+
+ logger.info("Configuring Job to take input from the mock Rya instance...");
+ setupRyaInput(job);
+
+ logger.info("Configuring Job to output Text to a new temporary directory...");
+ tempDir = Files.createTempDirectory("rya-mr-example").toString();
+ Path outputPath = new Path(tempDir, "rdf-output");
+ job.setOutputFormatClass(TextOutputFormat.class);
+ TextOutputFormat.setOutputPath(job, outputPath);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ Date start = new Date();
+ logger.info("Starting Job at: start");
+ boolean success = job.waitForCompletion(true);
+
+ if (!success) {
+ System.out.println("Job Failed!!!");
+ return 1;
+ }
+
+ Date end = new Date();
+ logger.info("Job ended: " + end);
+ logger.info("The job took " + (end.getTime() - start.getTime()) / 1000 + " seconds.");
+ // Print output and then delete temp files:
+ java.nio.file.Path tempPath = FileSystems.getDefault().getPath(tempDir);
+ for (java.nio.file.Path subdir : Files.newDirectoryStream(tempPath)) {
+ logger.info("");
+ logger.info("Output files:");
+ for (java.nio.file.Path outputFile : Files.newDirectoryStream(subdir)) {
+ logger.info("\t" + outputFile);
+ }
+ for (java.nio.file.Path outputFile : Files.newDirectoryStream(subdir, "part*")) {
+ logger.info("");
+ logger.info("Contents of " + outputFile + ":");
+ BufferedReader reader = Files.newBufferedReader(outputFile, Charset.defaultCharset());
+ String line;
+ while ((line = reader.readLine()) != null) {
+ logger.info("\t" + line);
+ }
+ reader.close();
+ }
+ for (java.nio.file.Path outputFile : Files.newDirectoryStream(subdir)) {
+ Files.deleteIfExists(outputFile);
+ }
+ Files.deleteIfExists(subdir);
+ }
+ Files.deleteIfExists(tempPath);
+ logger.info("");
+ logger.info("Temporary directory " + tempDir + " deleted.");
+
+ return 0;
+ }
+
+ static class RyaToRdfMapper extends Mapper<Text, RyaStatementWritable, NullWritable, Text> {
+ Text textOut = new Text();
+ @Override
+ protected void map(Text key, RyaStatementWritable value, Context context) throws IOException, InterruptedException {
+ // receives a RyaStatementWritable; convert to a Statement
+ RyaStatement rstmt = value.getRyaStatement();
+ Statement st = RyaToRdfConversions.convertStatement(rstmt);
+ logger.info("Mapper receives: " + rstmt);
+ // then convert to an RDF string
+ StringWriter writer = new StringWriter();
+ try {
+ RDFWriter rdfWriter = Rio.createWriter(rdfFormat, writer);
+ rdfWriter.startRDF();
+ rdfWriter.handleStatement(st);
+ rdfWriter.endRDF();
+ } catch (RDFHandlerException e) {
+ throw new IOException("Error writing RDF data", e);
+ }
+ // Write the string to the output
+ String line = writer.toString().trim();
+ logger.info("Serialized to RDF: " + line);
+ textOut.set(line);
+ context.write(NullWritable.get(), textOut);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
new file mode 100644
index 0000000..ee4e00b
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
@@ -0,0 +1,258 @@
+package mvm.rya.accumulo.mr.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import java.io.IOException;
+import java.util.Date;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
+import mvm.rya.accumulo.mr.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
+/**
+ * Count subject, predicate, object. Save in table
+ * Class RdfCloudTripleStoreCountTool
+ * Date: Apr 12, 2011
+ * Time: 10:39:40 AM
+ * @deprecated
+ */
+public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool {
+
+ public static void main(String[] args) {
+ try {
+
+ ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * cloudbase props
+ */
+
+ @Override
+ public int run(String[] strings) throws Exception {
+ conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics");
+
+ //initialize
+ init();
+
+ Job job = new Job(conf);
+ job.setJarByClass(AccumuloRdfCountTool.class);
+ setupAccumuloInput(job);
+
+ AccumuloInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE}))));
+ // set input output of the particular job
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Mutation.class);
+
+ // set mapper and reducer classes
+ job.setMapperClass(CountPiecesMapper.class);
+ job.setCombinerClass(CountPiecesCombiner.class);
+ job.setReducerClass(CountPiecesReducer.class);
+
+ String outputTable = MRUtils.getTablePrefix(conf) + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX;
+ setupAccumuloOutput(job, outputTable);
+
+ // Submit the job
+ Date startTime = new Date();
+ System.out.println("Job started: " + startTime);
+ int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+ if (exitCode == 0) {
+ Date end_time = new Date();
+ System.out.println("Job ended: " + end_time);
+ System.out.println("The job took "
+ + (end_time.getTime() - startTime.getTime()) / 1000
+ + " seconds.");
+ return 0;
+ } else {
+ System.out.println("Job Failed!!!");
+ }
+
+ return -1;
+ }
+
+ public static class CountPiecesMapper extends Mapper<Key, Value, Text, LongWritable> {
+
+ public static final byte[] EMPTY_BYTES = new byte[0];
+ private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
+
+ ValueFactoryImpl vf = new ValueFactoryImpl();
+
+ private Text keyOut = new Text();
+ private LongWritable valOut = new LongWritable(1);
+ private RyaTripleContext ryaContext;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
+ conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
+ ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
+ }
+
+ @Override
+ protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
+ try {
+ RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes()));
+ //count each piece subject, pred, object
+
+ String subj = statement.getSubject().getData();
+ String pred = statement.getPredicate().getData();
+// byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject());
+ RyaURI scontext = statement.getContext();
+ boolean includesContext = scontext != null;
+ String scontext_str = (includesContext) ? scontext.getData() : null;
+
+ ByteArrayDataOutput output = ByteStreams.newDataOutput();
+ output.writeUTF(subj);
+ output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF);
+ output.writeBoolean(includesContext);
+ if (includesContext)
+ output.writeUTF(scontext_str);
+ keyOut.set(output.toByteArray());
+ context.write(keyOut, valOut);
+
+ output = ByteStreams.newDataOutput();
+ output.writeUTF(pred);
+ output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF);
+ output.writeBoolean(includesContext);
+ if (includesContext)
+ output.writeUTF(scontext_str);
+ keyOut.set(output.toByteArray());
+ context.write(keyOut, valOut);
+ } catch (TripleRowResolverException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
+
+ private LongWritable valOut = new LongWritable();
+
+ // TODO: can still add up to be large I guess
+ // any count lower than this does not need to be saved
+ public static final int TOO_LOW = 2;
+
+ @Override
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ long count = 0;
+ for (LongWritable lw : values) {
+ count += lw.get();
+ }
+
+ if (count <= TOO_LOW)
+ return;
+
+ valOut.set(count);
+ context.write(key, valOut);
+ }
+
+ }
+
+ public static class CountPiecesReducer extends Reducer<Text, LongWritable, Text, Mutation> {
+
+ Text row = new Text();
+ Text cat_txt = new Text();
+ Value v_out = new Value();
+ ValueFactory vf = new ValueFactoryImpl();
+
+ // any count lower than this does not need to be saved
+ public static final int TOO_LOW = 10;
+ private String tablePrefix;
+ protected Text table;
+ private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
+ table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
+ final String cv_s = context.getConfiguration().get(MRUtils.AC_CV_PROP);
+ if (cv_s != null)
+ cv = new ColumnVisibility(cv_s);
+ }
+
+ @Override
+ protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ long count = 0;
+ for (LongWritable lw : values) {
+ count += lw.get();
+ }
+
+ if (count <= TOO_LOW)
+ return;
+
+ ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes());
+ String v = badi.readUTF();
+ cat_txt.set(badi.readUTF());
+
+ Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT;
+ boolean includesContext = badi.readBoolean();
+ if (includesContext) {
+ columnQualifier = new Text(badi.readUTF());
+ }
+
+ row.set(v);
+ Mutation m = new Mutation(row);
+ v_out.set((count + "").getBytes());
+ m.put(cat_txt, columnQualifier, cv, v_out);
+ context.write(table, m);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java
new file mode 100644
index 0000000..7857e45
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/RdfFileInputTool.java
@@ -0,0 +1,91 @@
+package mvm.rya.accumulo.mr.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.openrdf.rio.RDFFormat;
+
+import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
+import mvm.rya.accumulo.mr.MRUtils;
+
+/**
+ * Reads RDF data from one or more file(s) and inserts statements into Rya.
+ * <p>
+ * Uses {@link mvm.rya.accumulo.mr.RdfFileInputFormat} to read data.
+ * <p>
+ * Takes one argument: the file or directory to read (from HDFS).
+ * <p>
+ * Expects configuration:
+ * <p>
+ * - RDF format, named by parameter "rdf.format"; see {@link RDFFormat}.
+ * Defaults to rdf/xml. If using multiple files, all must be the same format.
+ * <p>
+ * - Accumulo and Rya configuration parameters as named in {@link MRUtils}
+ * (username, password, instance name, zookeepers, and Rya prefix)
+ * <p>
+ * - Indexing configuration parameters as named in
+ * {@link mvm.rya.indexing.accumulo.ConfigUtils} (enable or disable freetext,
+ * geo, temporal, and entity indexing, and specify predicates for each
+ * indexer). If not given, no secondary indexing is done.
+ */
+public class RdfFileInputTool extends AbstractAccumuloMRTool implements Tool {
+ public static void main(String[] args) {
+ try {
+ ToolRunner.run(new Configuration(), new RdfFileInputTool(), args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ init();
+ Job job = Job.getInstance(conf, "Rdf File Input");
+ job.setJarByClass(RdfFileInputTool.class);
+
+ String inputPath = conf.get(MRUtils.INPUT_PATH, args[0]);
+ setupFileInput(job, inputPath, RDFFormat.RDFXML);
+ setupRyaOutput(job);
+ job.setNumReduceTasks(0);
+
+ Date startTime = new Date();
+ System.out.println("Job started: " + startTime);
+ int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+ if (exitCode == 0) {
+ Date end_time = new Date();
+ System.out.println("Job ended: " + end_time);
+ System.out.println("The job took "
+ + (end_time.getTime() - startTime.getTime()) / 1000
+ + " seconds.");
+ long n = job.getCounters()
+ .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue();
+ System.out.println(n + " statement(s) inserted to Rya.");
+ } else {
+ System.out.println("Job Failed!!!");
+ }
+ return exitCode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java
new file mode 100644
index 0000000..d713d85
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/tools/Upgrade322Tool.java
@@ -0,0 +1,241 @@
+package mvm.rya.accumulo.mr.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+
+import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
+import mvm.rya.accumulo.mr.MRUtils;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.calrissian.mango.types.LexiTypeEncoders;
+import org.calrissian.mango.types.TypeEncoder;
+
+import java.io.IOException;
+import java.util.Date;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
+
+/**
+ */
+public class Upgrade322Tool extends AbstractAccumuloMRTool implements Tool {
+ @Override
+ public int run(String[] strings) throws Exception {
+ conf.set(MRUtils.JOB_NAME_PROP, "Upgrade to Rya 3.2.2");
+ //faster
+ init();
+
+ Job job = new Job(conf);
+ job.setJarByClass(Upgrade322Tool.class);
+
+ setupAccumuloInput(job);
+ AccumuloInputFormat.setInputTableName(job, MRUtils.getTablePrefix(conf) + TBL_OSP_SUFFIX);
+
+ //we do not need to change any row that is a string, custom, or uri type
+ IteratorSetting regex = new IteratorSetting(30, "regex",
+ RegExFilter.class);
+ RegExFilter.setRegexs(regex, "\\w*" + TYPE_DELIM + "[\u0003|\u0008|\u0002]", null, null, null, false);
+ RegExFilter.setNegate(regex, true);
+
+ // set input output of the particular job
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Mutation.class);
+
+ setupAccumuloOutput(job, MRUtils.getTablePrefix(conf) +
+ TBL_SPO_SUFFIX);
+
+ // set mapper and reducer classes
+ job.setMapperClass(Upgrade322Mapper.class);
+ job.setReducerClass(Reducer.class);
+
+ // Submit the job
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public static void main(String[] args) {
+ try {
+ ToolRunner.run(new Configuration(), new Upgrade322Tool(), args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Reading from the OSP table
+ */
+ public static class Upgrade322Mapper extends Mapper<Key, Value, Text, Mutation> {
+
+ private String tablePrefix;
+ private Text spoTable;
+ private Text poTable;
+ private Text ospTable;
+
+ private final UpgradeObjectSerialization upgradeObjectSerialization;
+
+ public Upgrade322Mapper() {
+ this(new UpgradeObjectSerialization());
+ }
+
+ public Upgrade322Mapper(
+ UpgradeObjectSerialization upgradeObjectSerialization) {
+ this.upgradeObjectSerialization = upgradeObjectSerialization;
+ }
+
+ @Override
+ protected void setup(
+ Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ tablePrefix = context.getConfiguration().get(
+ MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF);
+ spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX);
+ poTable = new Text(tablePrefix + TBL_PO_SUFFIX);
+ ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX);
+ }
+
+ @Override
+ protected void map(
+ Key key, Value value, Context context)
+ throws IOException, InterruptedException {
+
+ //read the key, expect OSP
+ final String row = key.getRow().toString();
+ final int firstDelim = row.indexOf(DELIM);
+ final int secondDelim = row.indexOf(DELIM, firstDelim + 1);
+ final int typeDelim = row.lastIndexOf(TYPE_DELIM);
+ final String oldSerialization = row.substring(0, firstDelim);
+ char typeMarker = row.charAt(row.length() - 1);
+
+ final String subject = row.substring(firstDelim + 1, secondDelim);
+ final String predicate = row.substring(secondDelim + 1, typeDelim);
+ final String typeSuffix = TYPE_DELIM + typeMarker;
+
+ String newSerialization = upgradeObjectSerialization.upgrade(oldSerialization, typeMarker);
+ if(newSerialization == null) {
+ return;
+ }
+
+ //write out delete Mutations
+ Mutation deleteOldSerialization_osp = new Mutation(key.getRow());
+ deleteOldSerialization_osp.putDelete(key.getColumnFamily(), key.getColumnQualifier(),
+ key.getColumnVisibilityParsed());
+ Mutation deleteOldSerialization_po = new Mutation(predicate + DELIM + oldSerialization + DELIM + subject + typeSuffix);
+ deleteOldSerialization_po.putDelete(key.getColumnFamily(),
+ key.getColumnQualifier(),
+ key.getColumnVisibilityParsed());
+ Mutation deleteOldSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + oldSerialization + typeSuffix);
+ deleteOldSerialization_spo.putDelete(key.getColumnFamily(), key.getColumnQualifier(),
+ key.getColumnVisibilityParsed());
+
+ //write out new serialization
+ Mutation putNewSerialization_osp = new Mutation(newSerialization + DELIM + subject + DELIM + predicate + typeSuffix);
+ putNewSerialization_osp.put(key.getColumnFamily(),
+ key.getColumnQualifier(),
+ key.getColumnVisibilityParsed(),
+ key.getTimestamp(), value);
+ Mutation putNewSerialization_po = new Mutation(predicate + DELIM + newSerialization + DELIM + subject + typeSuffix);
+ putNewSerialization_po.put(key.getColumnFamily(),
+ key.getColumnQualifier(),
+ key.getColumnVisibilityParsed(),
+ key.getTimestamp(), value);
+ Mutation putNewSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + newSerialization + typeSuffix);
+ putNewSerialization_spo.put(key.getColumnFamily(),
+ key.getColumnQualifier(),
+ key.getColumnVisibilityParsed(),
+ key.getTimestamp(), value);
+
+ //write out deletes to all tables
+ context.write(ospTable, deleteOldSerialization_osp);
+ context.write(poTable, deleteOldSerialization_po);
+ context.write(spoTable, deleteOldSerialization_spo);
+
+ //write out inserts to all tables
+ context.write(ospTable, putNewSerialization_osp);
+ context.write(poTable, putNewSerialization_po);
+ context.write(spoTable, putNewSerialization_spo);
+ }
+ }
+
+ public static class UpgradeObjectSerialization {
+
+ public static final TypeEncoder<Boolean, String>
+ BOOLEAN_STRING_TYPE_ENCODER = LexiTypeEncoders.booleanEncoder();
+ public static final TypeEncoder<Byte, String> BYTE_STRING_TYPE_ENCODER
+ = LexiTypeEncoders.byteEncoder();
+ public static final TypeEncoder<Date, String> DATE_STRING_TYPE_ENCODER
+ = LexiTypeEncoders.dateEncoder();
+ public static final TypeEncoder<Integer, String>
+ INTEGER_STRING_TYPE_ENCODER = LexiTypeEncoders.integerEncoder();
+ public static final TypeEncoder<Long, String> LONG_STRING_TYPE_ENCODER
+ = LexiTypeEncoders.longEncoder();
+ public static final TypeEncoder<Double, String>
+ DOUBLE_STRING_TYPE_ENCODER = LexiTypeEncoders.doubleEncoder();
+
+ public String upgrade(String object, int typeMarker) {
+ switch(typeMarker) {
+ case 10: //boolean
+ final boolean bool = Boolean.parseBoolean(object);
+ return BOOLEAN_STRING_TYPE_ENCODER.encode(bool);
+ case 9: //byte
+ final byte b = Byte.parseByte(object);
+ return BYTE_STRING_TYPE_ENCODER.encode(b);
+ case 4: //long
+ final Long lng = Long.parseLong(object);
+ return LONG_STRING_TYPE_ENCODER.encode(lng);
+ case 5: //int
+ final Integer i = Integer.parseInt(object);
+ return INTEGER_STRING_TYPE_ENCODER.encode(i);
+ case 6: //double
+ String exp = object.substring(2, 5);
+ char valueSign = object.charAt(0);
+ char expSign = object.charAt(1);
+ Integer expInt = Integer.parseInt(exp);
+ if (expSign == '-') {
+ expInt = 999 - expInt;
+ }
+ final String expDoubleStr =
+ String.format("%s%sE%s%d", valueSign,
+ object.substring(6),
+ expSign, expInt);
+ return DOUBLE_STRING_TYPE_ENCODER
+ .encode(Double.parseDouble(expDoubleStr));
+ case 7: //datetime
+ //check to see if it is an early release that includes the exact term xsd:dateTime
+ final Long l = Long.MAX_VALUE - Long.parseLong(object);
+ Date date = new Date(l);
+ return DATE_STRING_TYPE_ENCODER.encode(date);
+ default:
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java
new file mode 100644
index 0000000..cda66bd
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RdfFileInputFormatTest.java
@@ -0,0 +1,180 @@
+package mvm.rya.accumulo.mr;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.ContextStatementImpl;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.rio.RDFFormat;
+
+import mvm.rya.api.resolver.RyaToRdfConversions;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class RdfFileInputFormatTest {
+ static String NT_INPUT = "src/test/resources/test.ntriples";
+ static String TRIG_INPUT = "src/test/resources/namedgraphs.trig";
+
+ Configuration conf;
+ Job job;
+ FileSystem fs;
+ RdfFileInputFormat.RdfFileRecordReader reader;
+
+ @Rule
+ public ExpectedException expected = ExpectedException.none();
+
+ @Before
+ public void before() throws IOException {
+ conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ fs = FileSystem.get(conf);
+ job = Job.getInstance(conf);
+ }
+
+ void init(String filename) throws IOException, InterruptedException {
+ conf = job.getConfiguration();
+ File inputFile = new File(filename);
+ Path inputPath = new Path(inputFile.getAbsoluteFile().toURI());
+ InputSplit split = new FileSplit(inputPath, 0, inputFile.length(), null);
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+ reader = (RdfFileInputFormat.RdfFileRecordReader) new RdfFileInputFormat().createRecordReader(split, context);
+ reader.initialize(split, context);
+ }
+
+ @Test
+ public void testStatementInput() throws Exception {
+ RdfFileInputFormat.setRDFFormat(job, RDFFormat.NTRIPLES);
+ init(NT_INPUT);
+ String prefix = "urn:lubm:rdfts#";
+ URI[] gs = {
+ new URIImpl(prefix + "GraduateStudent01"),
+ new URIImpl(prefix + "GraduateStudent02"),
+ new URIImpl(prefix + "GraduateStudent03"),
+ new URIImpl(prefix + "GraduateStudent04")
+ };
+ URI hasFriend = new URIImpl(prefix + "hasFriend");
+ Statement[] statements = {
+ new StatementImpl(gs[0], hasFriend, gs[1]),
+ new StatementImpl(gs[1], hasFriend, gs[2]),
+ new StatementImpl(gs[2], hasFriend, gs[3])
+ };
+ int count = 0;
+ while (reader.nextKeyValue()) {
+ Assert.assertEquals(statements[count],
+ RyaToRdfConversions.convertStatement(reader.getCurrentValue().getRyaStatement()));
+ count++;
+ Assert.assertEquals(count, reader.getCurrentKey().get());
+ }
+ Assert.assertEquals(3, count);
+ }
+
+ @Test
+ public void testTrigInput() throws Exception {
+ RdfFileInputFormat.setRDFFormat(job, RDFFormat.TRIG);
+ init(TRIG_INPUT);
+ Assert.assertTrue(reader.nextKeyValue());
+ Assert.assertEquals(1, reader.getCurrentKey().get());
+ Statement expected = new ContextStatementImpl(
+ new URIImpl("http://www.example.org/exampleDocument#Monica"),
+ new URIImpl("http://www.example.org/vocabulary#name"),
+ new LiteralImpl("Monica Murphy"),
+ new URIImpl("http://www.example.org/exampleDocument#G1"));
+ Statement actual = RyaToRdfConversions.convertStatement(
+ reader.getCurrentValue().getRyaStatement());
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testBlockStatementQueue() throws Exception {
+ RdfFileInputFormat.setRDFFormat(job, RDFFormat.NTRIPLES);
+ RdfFileInputFormat.setStatementBufferSize(job, 2);
+ init(NT_INPUT);
+ // 3 statements in total, plus done signal: should fill up three times
+ int interval = 100; // ms to sleep per iteration while waiting for statement cache to fill
+ int maxSeconds = 120; // timeout that should never be reached
+ for (int i = 0; i < 3; i++) {
+ long t = 0;
+ while (reader.statementCache.remainingCapacity() > 0) {
+ if (t >= (maxSeconds*1000)) {
+ Assert.fail("Statement buffer still hasn't filled up after " + maxSeconds + " seconds.");
+ }
+ Assert.assertTrue(reader.statementCache.size() <= 2);
+ Thread.sleep(interval);
+ t += interval;
+ }
+ Assert.assertEquals(2, reader.statementCache.size());
+ Assert.assertEquals(0, reader.statementCache.remainingCapacity());
+ Assert.assertTrue(reader.nextKeyValue());
+ }
+ // Then the only thing in the queue should be the done signal
+ Assert.assertSame(RdfFileInputFormat.DONE, reader.statementCache.peek());
+ Assert.assertEquals(1, reader.statementCache.size());
+ Assert.assertFalse(reader.nextKeyValue());
+ Assert.assertTrue(reader.statementCache.isEmpty());
+ }
+
+ @Test
+ public void testFailGracefully() throws Exception {
+ // Pass the wrong RDF format and make sure all threads terminate
+ int interval = 100; // ms to sleep per iteration while waiting for statement cache to fill
+ int maxSeconds = 60; // timeout that should never be reached
+ RdfFileInputFormat.setRDFFormat(job, RDFFormat.RDFXML);
+ RdfFileInputFormat.setTimeout(job, maxSeconds*2);
+ init(NT_INPUT);
+ long t = 0;
+ while (reader.statementCache.isEmpty()) {
+ if (t >= (maxSeconds*1000)) {
+ Assert.fail("Statement buffer still hasn't been sent error signal after " + maxSeconds + " seconds.");
+ }
+ Thread.sleep(interval);
+ t += interval;
+ }
+ Assert.assertSame(RdfFileInputFormat.ERROR, reader.statementCache.peek());
+ expected.expect(IOException.class);
+ try {
+ Assert.assertFalse(reader.nextKeyValue());
+ }
+ catch (Exception e) {
+ Assert.assertNull(reader.getCurrentKey());
+ Assert.assertNull(reader.getCurrentValue());
+ Assert.assertFalse(reader.readerThread.isAlive());
+ Assert.assertFalse(reader.parserThread.isAlive());
+ throw e;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java
new file mode 100644
index 0000000..2755732
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java
@@ -0,0 +1,156 @@
+package mvm.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.RyaTableMutationsFactory;
+import mvm.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaTripleContext;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class RyaInputFormatTest {
+
+ static String username = "root", table = "rya_spo";
+ static PasswordToken password = new PasswordToken("");
+
+ static Instance instance;
+ static AccumuloRyaDAO apiImpl;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ instance = new MockInstance(RyaInputFormatTest.class.getName() + ".mock_instance");
+ Connector connector = instance.getConnector(username, password);
+ connector.tableOperations().create(table);
+
+ AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix("rya_");
+ conf.setDisplayQueryPlan(false);
+
+ apiImpl = new AccumuloRyaDAO();
+ apiImpl.setConf(conf);
+ apiImpl.setConnector(connector);
+ }
+
+ @Before
+ public void before() throws Exception {
+ apiImpl.init();
+ }
+
+ @After
+ public void after() throws Exception {
+ apiImpl.dropAndDestroy();
+ }
+
+ @Test
+ public void testInputFormat() throws Exception {
+
+
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI("http://www.google.com"))
+ .setPredicate(new RyaURI("http://some_other_uri"))
+ .setObject(new RyaURI("http://www.yahoo.com"))
+ .setColumnVisibility(new byte[0])
+ .setValue(new byte[0])
+ .build();
+
+ apiImpl.add(input);
+
+ Job jobConf = Job.getInstance();
+
+ RyaInputFormat.setMockInstance(jobConf, instance.getInstanceName());
+ RyaInputFormat.setConnectorInfo(jobConf, username, password);
+ RyaInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
+
+ AccumuloInputFormat.setInputTableName(jobConf, table);
+ AccumuloInputFormat.setInputTableName(jobConf, table);
+ AccumuloInputFormat.setScanIsolation(jobConf, false);
+ AccumuloInputFormat.setLocalIterators(jobConf, false);
+ AccumuloInputFormat.setOfflineTableScan(jobConf, false);
+
+ RyaInputFormat inputFormat = new RyaInputFormat();
+
+ JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
+
+ List<InputSplit> splits = inputFormat.getSplits(context);
+
+ Assert.assertEquals(1, splits.size());
+
+ TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
+
+ RecordReader<Text, RyaStatementWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
+
+ RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader;
+ ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+ List<RyaStatement> results = new ArrayList<RyaStatement>();
+ while(ryaStatementRecordReader.nextKeyValue()) {
+ RyaStatementWritable writable = ryaStatementRecordReader.getCurrentValue();
+ RyaStatement value = writable.getRyaStatement();
+ Text text = ryaStatementRecordReader.getCurrentKey();
+ RyaStatement stmt = RyaStatement.builder()
+ .setSubject(value.getSubject())
+ .setPredicate(value.getPredicate())
+ .setObject(value.getObject())
+ .setContext(value.getContext())
+ .setQualifier(value.getQualifer())
+ .setColumnVisibility(value.getColumnVisibility())
+ .setValue(value.getValue())
+ .build();
+ results.add(stmt);
+
+ System.out.println(text);
+ System.out.println(value);
+ }
+
+ Assert.assertTrue(results.size() == 2);
+ Assert.assertTrue(results.contains(input));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java
new file mode 100644
index 0000000..a48afa3
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaOutputFormatTest.java
@@ -0,0 +1,324 @@
+package mvm.rya.accumulo.mr;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import com.vividsolutions.jts.geom.Coordinate;
+import com.vividsolutions.jts.geom.GeometryFactory;
+import com.vividsolutions.jts.geom.Point;
+import com.vividsolutions.jts.geom.PrecisionModel;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.indexing.StatementConstraints;
+import mvm.rya.indexing.TemporalInstant;
+import mvm.rya.indexing.TemporalInstantRfc3339;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.freetext.SimpleTokenizer;
+import mvm.rya.indexing.accumulo.freetext.Tokenizer;
+import mvm.rya.indexing.accumulo.geo.GeoConstants;
+import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class RyaOutputFormatTest {
+ private static final String CV = "test_auth";
+ private static final String GRAPH = "http://example.org/graph";
+ private static final String USERNAME = "root";
+ private static final String PASSWORD = "";
+ private static final String INSTANCE_NAME = RyaOutputFormatTest.class.getSimpleName() + ".rya_output";
+ private static final String PREFIX = "ryaoutputformattest_";
+
+ MockInstance instance;
+ Connector connector;
+ AccumuloRyaDAO dao;
+ AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ Job job;
+ RyaTripleContext ryaContext;
+
+ @Before
+ public void init() throws Exception {
+ MRUtils.setACMock(conf, true);
+ MRUtils.setACInstance(conf, INSTANCE_NAME);
+ MRUtils.setACUserName(conf, USERNAME);
+ MRUtils.setACPwd(conf, PASSWORD);
+ MRUtils.setTablePrefix(conf, PREFIX);
+ conf.setTablePrefix(PREFIX);
+ conf.setAuths(CV);
+ conf.set(ConfigUtils.CLOUDBASE_INSTANCE, INSTANCE_NAME);
+ conf.set(ConfigUtils.CLOUDBASE_USER, USERNAME);
+ conf.set(ConfigUtils.CLOUDBASE_PASSWORD, PASSWORD);
+ conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true);
+ conf.setClass(ConfigUtils.TOKENIZER_CLASS, SimpleTokenizer.class, Tokenizer.class);
+ ryaContext = RyaTripleContext.getInstance(conf);
+ instance = new MockInstance(INSTANCE_NAME);
+ connector = instance.getConnector(USERNAME, new PasswordToken(PASSWORD));
+ job = Job.getInstance(conf);
+ RyaOutputFormat.setMockInstance(job, instance.getInstanceName());
+ AccumuloOutputFormat.setConnectorInfo(job, USERNAME, new PasswordToken(PASSWORD));
+ AccumuloOutputFormat.setCreateTables(job, true);
+ AccumuloOutputFormat.setDefaultTableName(job, PREFIX + "default");
+ RyaOutputFormat.setTablePrefix(job, PREFIX);
+ }
+
+ private void write(RyaStatement... input) throws IOException, InterruptedException {
+ RecordWriter<Writable, RyaStatementWritable> writer =
+ new RyaOutputFormat.RyaRecordWriter(job.getConfiguration());
+ for (RyaStatement rstmt : input) {
+ RyaStatementWritable rsw = new RyaStatementWritable(rstmt, ryaContext);
+ writer.write(new Text("unused"), rsw);
+ }
+ writer.close(null);
+ }
+
+ @Test
+ public void testOutputFormat() throws Exception {
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI("http://www.google.com"))
+ .setPredicate(new RyaURI("http://some_other_uri"))
+ .setObject(new RyaURI("http://www.yahoo.com"))
+ .setColumnVisibility(CV.getBytes())
+ .setValue(new byte[0])
+ .setContext(new RyaURI(GRAPH))
+ .build();
+ RyaOutputFormat.setCoreTablesEnabled(job, true);
+ RyaOutputFormat.setFreeTextEnabled(job, false);
+ RyaOutputFormat.setTemporalEnabled(job, false);
+ RyaOutputFormat.setGeoEnabled(job, false);
+ RyaOutputFormat.setEntityEnabled(job, false);
+ write(input);
+ TestUtils.verify(connector, conf, input);
+ }
+
+ @Test
+ public void testDefaultCV() throws Exception {
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI("http://www.google.com"))
+ .setPredicate(new RyaURI("http://some_other_uri"))
+ .setObject(new RyaURI("http://www.yahoo.com"))
+ .setValue(new byte[0])
+ .setContext(new RyaURI(GRAPH))
+ .build();
+ RyaStatement expected = RyaStatement.builder()
+ .setSubject(new RyaURI("http://www.google.com"))
+ .setPredicate(new RyaURI("http://some_other_uri"))
+ .setObject(new RyaURI("http://www.yahoo.com"))
+ .setValue(new byte[0])
+ .setContext(new RyaURI(GRAPH))
+ .setColumnVisibility(CV.getBytes())
+ .build();
+ RyaOutputFormat.setCoreTablesEnabled(job, true);
+ RyaOutputFormat.setFreeTextEnabled(job, false);
+ RyaOutputFormat.setTemporalEnabled(job, false);
+ RyaOutputFormat.setGeoEnabled(job, false);
+ RyaOutputFormat.setEntityEnabled(job, false);
+ RyaOutputFormat.setDefaultVisibility(job, CV);
+ write(input);
+ TestUtils.verify(connector, conf, expected);
+ }
+
+ @Test
+ public void testDefaultGraph() throws Exception {
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI("http://www.google.com"))
+ .setPredicate(new RyaURI("http://some_other_uri"))
+ .setObject(new RyaURI("http://www.yahoo.com"))
+ .setValue(new byte[0])
+ .setColumnVisibility(CV.getBytes())
+ .build();
+ RyaStatement expected = RyaStatement.builder()
+ .setSubject(new RyaURI("http://www.google.com"))
+ .setPredicate(new RyaURI("http://some_other_uri"))
+ .setObject(new RyaURI("http://www.yahoo.com"))
+ .setValue(new byte[0])
+ .setColumnVisibility(CV.getBytes())
+ .setContext(new RyaURI(GRAPH))
+ .build();
+ RyaOutputFormat.setCoreTablesEnabled(job, true);
+ RyaOutputFormat.setFreeTextEnabled(job, false);
+ RyaOutputFormat.setTemporalEnabled(job, false);
+ RyaOutputFormat.setGeoEnabled(job, false);
+ RyaOutputFormat.setEntityEnabled(job, false);
+ RyaOutputFormat.setDefaultContext(job, GRAPH);
+ write(input);
+ TestUtils.verify(connector, conf, expected);
+ }
+
+ @Test
+ public void testFreeTextIndexing() throws Exception {
+ AccumuloFreeTextIndexer ft = new AccumuloFreeTextIndexer();
+ ft.setConf(conf);
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI(GRAPH + ":s"))
+ .setPredicate(new RyaURI(GRAPH + ":p"))
+ .setObject(new RyaType(XMLSchema.STRING, "one two three four five"))
+ .build();
+ RyaOutputFormat.setCoreTablesEnabled(job, false);
+ RyaOutputFormat.setFreeTextEnabled(job, true);
+ RyaOutputFormat.setTemporalEnabled(job, false);
+ RyaOutputFormat.setGeoEnabled(job, false);
+ RyaOutputFormat.setEntityEnabled(job, false);
+ write(input);
+ Set<Statement> empty = new HashSet<>();
+ Set<Statement> expected = new HashSet<>();
+ expected.add(RyaToRdfConversions.convertStatement(input));
+ Assert.assertEquals(expected, getSet(ft.queryText("one", new StatementConstraints())));
+ Assert.assertEquals(empty, getSet(ft.queryText("!two", new StatementConstraints())));
+ Assert.assertEquals(expected, getSet(ft.queryText("*r", new StatementConstraints())));
+ Assert.assertEquals(empty, getSet(ft.queryText("r*", new StatementConstraints())));
+ Assert.assertEquals(expected, getSet(ft.queryText("!r*", new StatementConstraints())));
+ Assert.assertEquals(expected, getSet(ft.queryText("t* & !s*", new StatementConstraints())));
+ ft.close();
+ }
+
+ @Test
+ public void testTemporalIndexing() throws Exception {
+ TemporalInstant[] instants = {
+ new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 01),
+ new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 02),
+ new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03),
+ new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 03)
+ };
+ Statement[] statements = new Statement[instants.length];
+ RyaOutputFormat.setCoreTablesEnabled(job, false);
+ RyaOutputFormat.setFreeTextEnabled(job, false);
+ RyaOutputFormat.setTemporalEnabled(job, true);
+ RyaOutputFormat.setGeoEnabled(job, false);
+ RyaOutputFormat.setEntityEnabled(job, false);
+ ValueFactory vf = new ValueFactoryImpl();
+ for (int i = 0; i < instants.length; i++) {
+ RyaType time = RdfToRyaConversions.convertLiteral(vf.createLiteral(instants[i].toString()));
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI(GRAPH + ":s"))
+ .setPredicate(new RyaURI(GRAPH + ":p"))
+ .setObject(time)
+ .build();
+ write(input);
+ statements[i] = RyaToRdfConversions.convertStatement(input);
+ }
+ AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
+ temporal.setConf(conf);
+ Set<Statement> empty = new HashSet<>();
+ Set<Statement> head = new HashSet<>();
+ Set<Statement> tail = new HashSet<>();
+ head.add(statements[0]);
+ tail.add(statements[2]);
+ tail.add(statements[3]);
+ Assert.assertEquals(empty, getSet(temporal.queryInstantBeforeInstant(instants[0], new StatementConstraints())));
+ Assert.assertEquals(empty, getSet(temporal.queryInstantAfterInstant(instants[3], new StatementConstraints())));
+ Assert.assertEquals(head, getSet(temporal.queryInstantBeforeInstant(instants[1], new StatementConstraints())));
+ Assert.assertEquals(tail, getSet(temporal.queryInstantAfterInstant(instants[1], new StatementConstraints())));
+ temporal.close();
+ }
+
+ @Test
+ public void testGeoIndexing() throws Exception {
+ GeometryFactory gf = new GeometryFactory(new PrecisionModel(), 4326);
+ Point p1 = gf.createPoint(new Coordinate(1, 1));
+ Point p2 = gf.createPoint(new Coordinate(2, 2));
+ GeoMesaGeoIndexer geo = new GeoMesaGeoIndexer();
+ geo.setConf(conf);
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI(GRAPH + ":s"))
+ .setPredicate(new RyaURI(GRAPH + ":p"))
+ .setObject(new RyaType(GeoConstants.XMLSCHEMA_OGC_WKT, "Point(2 2)"))
+ .build();
+ RyaOutputFormat.setCoreTablesEnabled(job, false);
+ RyaOutputFormat.setFreeTextEnabled(job, false);
+ RyaOutputFormat.setTemporalEnabled(job, false);
+ RyaOutputFormat.setGeoEnabled(job, true);
+ RyaOutputFormat.setEntityEnabled(job, false);
+ write(input);
+ Set<Statement> expected = new HashSet<>();
+ Assert.assertEquals(expected, getSet(geo.queryContains(p1, new StatementConstraints())));
+ expected.add(RyaToRdfConversions.convertStatement(input));
+ Assert.assertEquals(expected, getSet(geo.queryEquals(p2, new StatementConstraints())));
+ geo.close();
+ }
+
+ @Test
+ public void testEntityIndexing() throws Exception {
+ EntityCentricIndex entity = new EntityCentricIndex();
+ entity.setConf(conf);
+ RyaStatement input = RyaStatement.builder()
+ .setSubject(new RyaURI(GRAPH + ":s"))
+ .setPredicate(new RyaURI(GRAPH + ":p"))
+ .setObject(new RyaURI(GRAPH + ":o"))
+ .build();
+ RyaOutputFormat.setCoreTablesEnabled(job, false);
+ RyaOutputFormat.setFreeTextEnabled(job, false);
+ RyaOutputFormat.setTemporalEnabled(job, false);
+ RyaOutputFormat.setGeoEnabled(job, false);
+ RyaOutputFormat.setEntityEnabled(job, true);
+ write(input);
+ entity.close();
+ Set<Statement> expected = new HashSet<>();
+ Set<Statement> inserted = new HashSet<>();
+ expected.add(RyaToRdfConversions.convertStatement(input));
+ String table = ConfigUtils.getEntityTableName(conf);
+ Scanner scanner = connector.createScanner(table, new Authorizations(CV));
+ for (Map.Entry<Key, Value> row : scanner) {
+ System.out.println(row);
+ inserted.add(RyaToRdfConversions.convertStatement(
+ EntityCentricIndex.deserializeStatement(row.getKey(), row.getValue())));
+ }
+ Assert.assertEquals(expected, inserted);
+ }
+
+ private static <X> Set<X> getSet(CloseableIteration<X, ?> iter) throws Exception {
+ Set<X> set = new HashSet<X>();
+ while (iter.hasNext()) {
+ set.add(iter.next());
+ }
+ return set;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java
new file mode 100644
index 0000000..8bebdf4
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/RyaStatementWritableTest.java
@@ -0,0 +1,146 @@
+package mvm.rya.accumulo.mr;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.junit.Assert;
+import org.junit.Rule;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaTripleContext;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class RyaStatementWritableTest {
+ private static final RyaURI s1 = new RyaURI(":s");
+ private static final RyaURI p1 = new RyaURI(":p");
+ private static final RyaType o1 = new RyaType(XMLSchema.INTEGER, "123");
+ private static final RyaURI s2 = new RyaURI(":s2");
+ private static final RyaURI p2 = new RyaURI(":p2");
+ private static final RyaType o2 = new RyaType(XMLSchema.STRING, "123");
+ private static final RyaURI graph1 = new RyaURI("http://example.org/graph1");
+ private static final RyaURI graph2 = new RyaURI("http://example.org/graph2");
+ private static final byte[] cv1 = "test_visibility".getBytes();
+ private static final long t1 = 1000;
+ private static final long t2 = 1001;
+ private static final RyaStatement rs1 = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+ .setContext(graph1).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t1).build();
+ // Equivalent:
+ private static final RyaStatement rs1b = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+ .setContext(graph1).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t1).build();
+ // Differ in one way each:
+ private static final RyaStatement rsGraph = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+ .setContext(graph2).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t1).build();
+ private static final RyaStatement rsCv = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+ .setContext(graph1).setColumnVisibility(null).setQualifier("q1").setTimestamp(t1).build();
+ private static final RyaStatement rsQualifier = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+ .setContext(graph1).setColumnVisibility(cv1).setQualifier("q2").setTimestamp(t1).build();
+ private static final RyaStatement rsTimestamp = RyaStatement.builder().setSubject(s1).setPredicate(p1).setObject(o1)
+ .setContext(graph1).setColumnVisibility(cv1).setQualifier("q1").setTimestamp(t2).build();
+ // Different triple:
+ private static final RyaStatement rs2 = RyaStatement.builder().setSubject(s2).setPredicate(p2).setObject(o2)
+ .setContext(graph1).setColumnVisibility(null).setQualifier("q1").setTimestamp(t1).build();
+
+ private static final RyaTripleContext ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
+
+ @Rule
+ public ExpectedException expected = ExpectedException.none();
+
+ @Test
+ public void testEquals() throws Exception {
+ RyaStatementWritable rsw1 = new RyaStatementWritable(rs1, ryaContext);
+ RyaStatementWritable rsw1b = new RyaStatementWritable(rs1b, null);
+ RyaStatementWritable rsw2 = new RyaStatementWritable(rs2, ryaContext);
+ RyaStatementWritable rswNull = new RyaStatementWritable(null, ryaContext);
+ Assert.assertEquals("Equivalent statements should be equal", rsw1, rsw1b);
+ Assert.assertFalse("equals(null) should be false", rsw1.equals(null));
+ Assert.assertNotEquals("Statements representing different triples are not equal", rsw1, rsw2);
+ Assert.assertNotEquals("Statements representing different triples are not equal", rsw1, rswNull);
+ Assert.assertNotEquals("Statements with different named graphs are not equal", rsw1,
+ new RyaStatementWritable(rsGraph, ryaContext));
+ Assert.assertNotEquals("Statements with different column visibilities are not equal", rsw1,
+ new RyaStatementWritable(rsCv, ryaContext));
+ Assert.assertNotEquals("Statements with different column qualifiers are not equal", rsw1,
+ new RyaStatementWritable(rsQualifier, ryaContext));
+ Assert.assertNotEquals("Statements with different timestamps are not equal", rsw1,
+ new RyaStatementWritable(rsTimestamp, ryaContext));
+ }
+
+ @Test
+ public void testCompareTo() throws Exception {
+ RyaStatementWritable rsw1 = new RyaStatementWritable(rs1, ryaContext);
+ RyaStatementWritable rsw1b = new RyaStatementWritable(rs1b, null);
+ RyaStatementWritable rsw2 = new RyaStatementWritable(rs2, null);
+ RyaStatementWritable rswGraph = new RyaStatementWritable(rsCv, ryaContext);
+ RyaStatementWritable rswCv = new RyaStatementWritable(rsCv, ryaContext);
+ RyaStatementWritable rswQualifier = new RyaStatementWritable(rsQualifier, ryaContext);
+ RyaStatementWritable rswTimestamp = new RyaStatementWritable(rsTimestamp, ryaContext);
+ Assert.assertEquals("x.compareTo(x) should always return 0", 0, rsw1.compareTo(rsw1));
+ Assert.assertEquals("x.compareTo(x') where x and x' are equal should return 0", 0, rsw1.compareTo(rsw1b));
+ Assert.assertEquals("x.compareTo(x') where x and x' are equal should return 0", 0, rsw1b.compareTo(rsw1));
+ Assert.assertNotEquals("Statements with different named graphs are not equal", 0, rsw1.compareTo(rswGraph));
+ Assert.assertNotEquals("Statements with different column visibilities are not equal", 0, rsw1.compareTo(rswCv));
+ Assert.assertNotEquals("Statements with different column qualifiers are not equal", 0, rsw1.compareTo(rswQualifier));
+ Assert.assertNotEquals("Statements with different timestamps are not equal", 0, rsw1.compareTo(rswTimestamp));
+ Assert.assertEquals("compareTo in opposite directions should yield opposite signs",
+ Integer.signum(rsw1.compareTo(rsw2))*-1, Integer.signum(rsw2.compareTo(rsw1)));
+ // cycles shouldn't be possible; these comparisons can't all be negative or all be positive:
+ int x = Integer.signum(rsw1.compareTo(rsw2))
+ + Integer.signum(rsw2.compareTo(rsw1b))
+ + Integer.signum(rsw1b.compareTo(rsw1));
+ Assert.assertNotEquals("compareTo cycle detected", 3, Math.abs(x));
+ // compareTo(null) should always throw an exception:
+ expected.expect(NullPointerException.class);
+ rsw1.compareTo(null);
+ }
+
+ @Test
+ public void testSerializeDeserialize() throws Exception {
+ RyaStatementWritable rsw1 = new RyaStatementWritable(rs1, ryaContext);
+ RyaStatementWritable rsw2 = new RyaStatementWritable(rs2, ryaContext);
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ DataOutputStream bytesOut = new DataOutputStream(bytes);
+ rsw1.write(bytesOut);
+ rsw2.write(bytesOut);
+ DataInputStream bytesIn = new DataInputStream(new ByteArrayInputStream(bytes.toByteArray()));
+ RyaStatementWritable deserialized = new RyaStatementWritable();
+ // Verify initial deserialization:
+ deserialized.readFields(bytesIn);
+ Assert.assertEquals("Deserialized statement not equal to original", rsw1, deserialized);
+ Assert.assertEquals("Deserialized statement has different hash code", rsw1.hashCode(), deserialized.hashCode());
+ Assert.assertEquals("original.compareTo(deserialized) should equal 0", 0, rsw1.compareTo(deserialized));
+ // Verify that a second read mutates the Writable object into the correct second record:
+ RyaStatement deserializedStatement = deserialized.getRyaStatement();
+ deserialized.readFields(bytesIn);
+ Assert.assertEquals("Deserialized statement not overwritten on second read", rsw2, deserialized);
+ // Verify that the internal RyaStatement object is recreated, not overwritten:
+ RyaStatement deserializedStatement2 = deserialized.getRyaStatement();
+ Assert.assertNotSame("Reading a second record should create a new internal RyaStatement",
+ deserializedStatement, deserializedStatement2);
+ }
+}
\ No newline at end of file