You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/07/21 12:50:30 UTC
[5/6] incubator-rya git commit: Consolidated MapReduce API and
applications into toplevel project.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java
deleted file mode 100644
index f44b6aa..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java
+++ /dev/null
@@ -1,175 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.rio.RDFFormat;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
-import mvm.rya.accumulo.mr.RyaStatementWritable;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputTool
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputTool extends AbstractAccumuloMRTool implements Tool {
-
- private String format = RDFFormat.RDFXML.getName();
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new RdfFileInputTool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException, AccumuloSecurityException {
- conf.set(MRUtils.JOB_NAME_PROP, "Rdf File Input");
- //faster
- init();
- format = conf.get(MRUtils.FORMAT_PROP, format);
- conf.set(MRUtils.FORMAT_PROP, format);
-
- String inputPath = conf.get(MRUtils.INPUT_PATH, args[0]);
-
- Job job = new Job(conf);
- job.setJarByClass(RdfFileInputTool.class);
-
- // set up cloudbase input
- job.setInputFormatClass(RdfFileInputFormat.class);
- RdfFileInputFormat.addInputPath(job, new Path(inputPath));
-
- // set input output of the particular job
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(RyaStatementWritable.class);
-
- setupOutputFormat(job, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-
- // set mapper and reducer classes
- job.setMapperClass(StatementToMutationMapper.class);
- job.setNumReduceTasks(0);
-
- // Submit the job
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
- if (exitCode == 0) {
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
- return job
- .getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS").getValue();
- } else {
- System.out.println("Job Failed!!!");
- }
-
- return -1;
- }
-
- @Override
- public int run(String[] args) throws Exception {
- runJob(args);
- return 0;
- }
-
- public static class StatementToMutationMapper extends Mapper<LongWritable, RyaStatementWritable, Text, Mutation> {
- protected String tablePrefix;
- protected Text spo_table;
- protected Text po_table;
- protected Text osp_table;
- private byte[] cv = EMPTY_CV.getExpression();
- RyaTableMutationsFactory mut;
-
- public StatementToMutationMapper() {
- }
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- Configuration conf = context.getConfiguration();
- mut = new RyaTableMutationsFactory(RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)));
- tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
- spo_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- po_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
- osp_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
-
- final String cv_s = conf.get(MRUtils.AC_CV_PROP);
- if (cv_s != null)
- cv = cv_s.getBytes();
- }
-
- @Override
- protected void map(LongWritable key, RyaStatementWritable value, Context context) throws IOException, InterruptedException {
- RyaStatement statement = value.getRyaStatement();
- if (statement.getColumnVisibility() == null) {
- statement.setColumnVisibility(cv);
- }
- Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap =
- mut.serialize(statement);
- Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
- Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
- Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-
- for (Mutation m : spo) {
- context.write(spo_table, m);
- }
- for (Mutation m : po) {
- context.write(po_table, m);
- }
- for (Mutation m : osp) {
- context.write(osp_table, m);
- }
- }
-
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java
deleted file mode 100644
index 89f0aa5..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/upgrade/Upgrade322Tool.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package mvm.rya.accumulo.mr.upgrade;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.RegExFilter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.calrissian.mango.types.LexiTypeEncoders;
-import org.calrissian.mango.types.TypeEncoder;
-
-import java.io.IOException;
-import java.util.Date;
-
-import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
-
-/**
- */
-public class Upgrade322Tool extends AbstractAccumuloMRTool implements Tool {
- @Override
- public int run(String[] strings) throws Exception {
- conf.set(MRUtils.JOB_NAME_PROP, "Upgrade to Rya 3.2.2");
- //faster
- init();
-
- Job job = new Job(conf);
- job.setJarByClass(Upgrade322Tool.class);
-
- setupInputFormat(job);
- AccumuloInputFormat.setInputTableName(job, tablePrefix + TBL_OSP_SUFFIX);
-
- //we do not need to change any row that is a string, custom, or uri type
- IteratorSetting regex = new IteratorSetting(30, "regex",
- RegExFilter.class);
- RegExFilter.setRegexs(regex, "\\w*" + TYPE_DELIM + "[\u0003|\u0008|\u0002]", null, null, null, false);
- RegExFilter.setNegate(regex, true);
-
- // set input output of the particular job
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Mutation.class);
-
- setupOutputFormat(job, tablePrefix +
- TBL_SPO_SUFFIX);
-
- // set mapper and reducer classes
- job.setMapperClass(Upgrade322Mapper.class);
- job.setReducerClass(Reducer.class);
-
- // Submit the job
- return job.waitForCompletion(true) ? 0 : 1;
- }
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new Upgrade322Tool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * Reading from the OSP table
- */
- public static class Upgrade322Mapper extends Mapper<Key, Value, Text, Mutation> {
-
- private String tablePrefix;
- private Text spoTable;
- private Text poTable;
- private Text ospTable;
-
- private final UpgradeObjectSerialization upgradeObjectSerialization;
-
- public Upgrade322Mapper() {
- this(new UpgradeObjectSerialization());
- }
-
- public Upgrade322Mapper(
- UpgradeObjectSerialization upgradeObjectSerialization) {
- this.upgradeObjectSerialization = upgradeObjectSerialization;
- }
-
- @Override
- protected void setup(
- Context context) throws IOException, InterruptedException {
- super.setup(context);
-
- tablePrefix = context.getConfiguration().get(
- MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF);
- spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX);
- poTable = new Text(tablePrefix + TBL_PO_SUFFIX);
- ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX);
- }
-
- @Override
- protected void map(
- Key key, Value value, Context context)
- throws IOException, InterruptedException {
-
- //read the key, expect OSP
- final String row = key.getRow().toString();
- final int firstDelim = row.indexOf(DELIM);
- final int secondDelim = row.indexOf(DELIM, firstDelim + 1);
- final int typeDelim = row.lastIndexOf(TYPE_DELIM);
- final String oldSerialization = row.substring(0, firstDelim);
- char typeMarker = row.charAt(row.length() - 1);
-
- final String subject = row.substring(firstDelim + 1, secondDelim);
- final String predicate = row.substring(secondDelim + 1, typeDelim);
- final String typeSuffix = TYPE_DELIM + typeMarker;
-
- String newSerialization = upgradeObjectSerialization.upgrade(oldSerialization, typeMarker);
- if(newSerialization == null) {
- return;
- }
-
- //write out delete Mutations
- Mutation deleteOldSerialization_osp = new Mutation(key.getRow());
- deleteOldSerialization_osp.putDelete(key.getColumnFamily(), key.getColumnQualifier(),
- key.getColumnVisibilityParsed());
- Mutation deleteOldSerialization_po = new Mutation(predicate + DELIM + oldSerialization + DELIM + subject + typeSuffix);
- deleteOldSerialization_po.putDelete(key.getColumnFamily(),
- key.getColumnQualifier(),
- key.getColumnVisibilityParsed());
- Mutation deleteOldSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + oldSerialization + typeSuffix);
- deleteOldSerialization_spo.putDelete(key.getColumnFamily(), key.getColumnQualifier(),
- key.getColumnVisibilityParsed());
-
- //write out new serialization
- Mutation putNewSerialization_osp = new Mutation(newSerialization + DELIM + subject + DELIM + predicate + typeSuffix);
- putNewSerialization_osp.put(key.getColumnFamily(),
- key.getColumnQualifier(),
- key.getColumnVisibilityParsed(),
- key.getTimestamp(), value);
- Mutation putNewSerialization_po = new Mutation(predicate + DELIM + newSerialization + DELIM + subject + typeSuffix);
- putNewSerialization_po.put(key.getColumnFamily(),
- key.getColumnQualifier(),
- key.getColumnVisibilityParsed(),
- key.getTimestamp(), value);
- Mutation putNewSerialization_spo = new Mutation(subject + DELIM + predicate + DELIM + newSerialization + typeSuffix);
- putNewSerialization_spo.put(key.getColumnFamily(),
- key.getColumnQualifier(),
- key.getColumnVisibilityParsed(),
- key.getTimestamp(), value);
-
- //write out deletes to all tables
- context.write(ospTable, deleteOldSerialization_osp);
- context.write(poTable, deleteOldSerialization_po);
- context.write(spoTable, deleteOldSerialization_spo);
-
- //write out inserts to all tables
- context.write(ospTable, putNewSerialization_osp);
- context.write(poTable, putNewSerialization_po);
- context.write(spoTable, putNewSerialization_spo);
- }
- }
-
- public static class UpgradeObjectSerialization {
-
- public static final TypeEncoder<Boolean, String>
- BOOLEAN_STRING_TYPE_ENCODER = LexiTypeEncoders.booleanEncoder();
- public static final TypeEncoder<Byte, String> BYTE_STRING_TYPE_ENCODER
- = LexiTypeEncoders.byteEncoder();
- public static final TypeEncoder<Date, String> DATE_STRING_TYPE_ENCODER
- = LexiTypeEncoders.dateEncoder();
- public static final TypeEncoder<Integer, String>
- INTEGER_STRING_TYPE_ENCODER = LexiTypeEncoders.integerEncoder();
- public static final TypeEncoder<Long, String> LONG_STRING_TYPE_ENCODER
- = LexiTypeEncoders.longEncoder();
- public static final TypeEncoder<Double, String>
- DOUBLE_STRING_TYPE_ENCODER = LexiTypeEncoders.doubleEncoder();
-
- public String upgrade(String object, int typeMarker) {
- switch(typeMarker) {
- case 10: //boolean
- final boolean bool = Boolean.parseBoolean(object);
- return BOOLEAN_STRING_TYPE_ENCODER.encode(bool);
- case 9: //byte
- final byte b = Byte.parseByte(object);
- return BYTE_STRING_TYPE_ENCODER.encode(b);
- case 4: //long
- final Long lng = Long.parseLong(object);
- return LONG_STRING_TYPE_ENCODER.encode(lng);
- case 5: //int
- final Integer i = Integer.parseInt(object);
- return INTEGER_STRING_TYPE_ENCODER.encode(i);
- case 6: //double
- String exp = object.substring(2, 5);
- char valueSign = object.charAt(0);
- char expSign = object.charAt(1);
- Integer expInt = Integer.parseInt(exp);
- if (expSign == '-') {
- expInt = 999 - expInt;
- }
- final String expDoubleStr =
- String.format("%s%sE%s%d", valueSign,
- object.substring(6),
- expSign, expInt);
- return DOUBLE_STRING_TYPE_ENCODER
- .encode(Double.parseDouble(expDoubleStr));
- case 7: //datetime
- //check to see if it is an early release that includes the exact term xsd:dateTime
- final Long l = Long.MAX_VALUE - Long.parseLong(object);
- Date date = new Date(l);
- return DATE_STRING_TYPE_ENCODER.encode(date);
- default:
- return null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java
deleted file mode 100644
index c9dac6b..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloHDFSFileInputFormat.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package mvm.rya.accumulo.mr.utils;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileSKVIterator;
-import org.apache.accumulo.core.file.rfile.RFileOperations;
-import org.apache.accumulo.core.util.ArgumentChecker;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-
-/**
- * Finds the accumulo tablet files on the hdfs disk, and uses that as the input for MR jobs
- * Date: 5/11/12
- * Time: 2:04 PM
- */
-public class AccumuloHDFSFileInputFormat extends FileInputFormat<Key, Value> {
-
- public static final Range ALLRANGE = new Range(new Text("\u0000"), new Text("\uFFFD"));
-
- @Override
- public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
- //read the params from AccumuloInputFormat
- Configuration conf = jobContext.getConfiguration();
- Instance instance = AccumuloProps.getInstance(jobContext);
- String user = AccumuloProps.getUsername(jobContext);
- AuthenticationToken password = AccumuloProps.getPassword(jobContext);
- String table = AccumuloProps.getTablename(jobContext);
- ArgumentChecker.notNull(instance);
- ArgumentChecker.notNull(table);
-
- //find the files necessary
- try {
- AccumuloConfiguration acconf = instance.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- Connector connector = instance.getConnector(user, password);
- TableOperations tos = connector.tableOperations();
- String tableId = tos.tableIdMap().get(table);
- String filePrefix = acconf.get(Property.INSTANCE_DFS_DIR) + "/tables/" + tableId;
- System.out.println(filePrefix);
-
- Scanner scanner = connector.createScanner("!METADATA", Constants.NO_AUTHS); //TODO: auths?
- scanner.setRange(new Range(new Text(tableId + "\u0000"), new Text(tableId + "\uFFFD")));
- scanner.fetchColumnFamily(new Text("file"));
- List<String> files = new ArrayList<String>();
- List<InputSplit> fileSplits = new ArrayList<InputSplit>();
- Job job = new Job(conf);
- for (Map.Entry<Key, Value> entry : scanner) {
- String file = filePrefix + entry.getKey().getColumnQualifier().toString();
- files.add(file);
- Path path = new Path(file);
- FileStatus fileStatus = fs.getFileStatus(path);
- long len = fileStatus.getLen();
- BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, len);
- fileSplits.add(new FileSplit(path, 0, len, fileBlockLocations[0].getHosts()));
-// FileInputFormat.addInputPath(job, path);
- }
- System.out.println(files);
- return fileSplits;
-// return super.getSplits(job);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public RecordReader<Key, Value> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- return new RecordReader<Key, Value>() {
-
- private FileSKVIterator fileSKVIterator;
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- FileSplit split = (FileSplit) inputSplit;
- Configuration job = taskAttemptContext.getConfiguration();
- Path file = split.getPath();
-// long start = split.getStart();
-// long length = split.getLength();
- FileSystem fs = file.getFileSystem(job);
-// FSDataInputStream fileIn = fs.open(file);
-// System.out.println(start);
-// if (start != 0L) {
-// fileIn.seek(start);
-// }
- Instance instance = AccumuloProps.getInstance(taskAttemptContext);
-
- fileSKVIterator = RFileOperations.getInstance().openReader(file.toString(), ALLRANGE,
- new HashSet<ByteSequence>(), false, fs, job, instance.getConfiguration());
-// fileSKVIterator = new RFileOperations2().openReader(fileIn, length - start, job);
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- fileSKVIterator.next();
- return fileSKVIterator.hasTop();
- }
-
- @Override
- public Key getCurrentKey() throws IOException, InterruptedException {
- return fileSKVIterator.getTopKey();
- }
-
- @Override
- public Value getCurrentValue() throws IOException, InterruptedException {
- return fileSKVIterator.getTopValue();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public void close() throws IOException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- };
- }
-
- public static void main(String[] args) {
- try {
- Job job = new Job(new Configuration());
- job.setJarByClass(AccumuloHDFSFileInputFormat.class);
- Configuration conf = job.getConfiguration();
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
- AccumuloInputFormat.setConnectorInfo(job, "root", new PasswordToken("secret"));
- AccumuloInputFormat.setInputTableName(job, "l_spo");
- AccumuloInputFormat.setScanAuthorizations(job, Constants.NO_AUTHS);
- AccumuloInputFormat.setZooKeeperInstance(job, "acu13", "stratus25:2181");
- AccumuloInputFormat.setRanges(job, Collections.singleton(ALLRANGE));
- job.setMapperClass(NullMapper.class);
- job.setNumReduceTasks(0);
- job.setOutputFormatClass(NullOutputFormat.class);
- if (args.length == 0) {
- job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
- } else {
- job.setInputFormatClass(AccumuloInputFormat.class);
- }
- job.waitForCompletion(true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @SuppressWarnings("rawtypes")
- public static class NullMapper extends Mapper {
- @Override
- protected void map(Object key, Object value, Context context) throws IOException, InterruptedException {
-
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java
deleted file mode 100644
index 2b89440..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/AccumuloProps.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package mvm.rya.accumulo.mr.utils;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-@SuppressWarnings("rawtypes")
-public class AccumuloProps extends InputFormatBase {
-
- @Override
- public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- throw new UnsupportedOperationException("Accumulo Props just holds properties");
- }
-
- public static Instance getInstance(JobContext conf) {
- return InputFormatBase.getInstance(conf);
- }
-
- public static AuthenticationToken getPassword(JobContext conf) {
- return InputFormatBase.getAuthenticationToken(conf);
- }
-
- public static String getUsername(JobContext conf) {
- return InputFormatBase.getPrincipal(conf);
- }
-
- public static String getTablename(JobContext conf) {
- return InputFormatBase.getInputTableName(conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java
deleted file mode 100644
index c3003d3..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/utils/MRUtils.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package mvm.rya.accumulo.mr.utils;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.openrdf.model.URI;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-/**
- * Class MRSailUtils
- * Date: May 19, 2011
- * Time: 10:34:06 AM
- */
-public class MRUtils {
-
- public static final String JOB_NAME_PROP = "mapred.job.name";
-
- public static final String AC_USERNAME_PROP = "ac.username";
- public static final String AC_PWD_PROP = "ac.pwd";
- public static final String AC_ZK_PROP = "ac.zk";
- public static final String AC_INSTANCE_PROP = "ac.instance";
- public static final String AC_TTL_PROP = "ac.ttl";
- public static final String AC_TABLE_PROP = "ac.table";
- public static final String AC_AUTH_PROP = "ac.auth";
- public static final String AC_CV_PROP = "ac.cv";
- public static final String AC_MOCK_PROP = "ac.mock";
- public static final String AC_HDFS_INPUT_PROP = "ac.hdfsinput";
- public static final String HADOOP_IO_SORT_MB = "ac.hdfsinput";
- public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout";
- public static final String FORMAT_PROP = "rdf.format";
- public static final String INPUT_PATH = "input";
-
- public static final String NAMED_GRAPH_PROP = "rdf.graph";
-
- public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix";
-
- // rdf constants
- public static final ValueFactory vf = new ValueFactoryImpl();
- public static final URI RDF_TYPE = vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type");
-
-
- // cloudbase map reduce utils
-
-// public static Range retrieveRange(URI entry_key, URI entry_val) throws IOException {
-// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput();
-// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_key));
-// if (entry_val != null) {
-// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_BYTES);
-// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_val));
-// }
-// byte[] startrow = startRowOut.toByteArray();
-// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_STOP_BYTES);
-// byte[] stoprow = startRowOut.toByteArray();
-//
-// Range range = new Range(new Text(startrow), new Text(stoprow));
-// return range;
-// }
-
-
- public static String getACTtl(Configuration conf) {
- return conf.get(AC_TTL_PROP);
- }
-
- public static String getACUserName(Configuration conf) {
- return conf.get(AC_USERNAME_PROP);
- }
-
- public static String getACPwd(Configuration conf) {
- return conf.get(AC_PWD_PROP);
- }
-
- public static String getACZK(Configuration conf) {
- return conf.get(AC_ZK_PROP);
- }
-
- public static String getACInstance(Configuration conf) {
- return conf.get(AC_INSTANCE_PROP);
- }
-
- public static void setACUserName(Configuration conf, String str) {
- conf.set(AC_USERNAME_PROP, str);
- }
-
- public static void setACPwd(Configuration conf, String str) {
- conf.set(AC_PWD_PROP, str);
- }
-
- public static void setACZK(Configuration conf, String str) {
- conf.set(AC_ZK_PROP, str);
- }
-
- public static void setACInstance(Configuration conf, String str) {
- conf.set(AC_INSTANCE_PROP, str);
- }
-
- public static void setACTtl(Configuration conf, String str) {
- conf.set(AC_TTL_PROP, str);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java
deleted file mode 100644
index 1e74e7c..0000000
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.RyaStatementInputFormat.RyaStatementRecordReader;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RyaTripleContext;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class RyaInputFormatTest {
-
- static String username = "root", table = "rya_spo";
- static PasswordToken password = new PasswordToken("");
-
- static Instance instance;
- static AccumuloRyaDAO apiImpl;
-
- @BeforeClass
- public static void init() throws Exception {
- instance = new MockInstance("mock_instance");
- Connector connector = instance.getConnector(username, password);
- connector.tableOperations().create(table);
-
- AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- conf.setTablePrefix("rya_");
- conf.setDisplayQueryPlan(false);
-
- apiImpl = new AccumuloRyaDAO();
- apiImpl.setConf(conf);
- apiImpl.setConnector(connector);
- }
-
- @Before
- public void before() throws Exception {
- apiImpl.init();
- }
-
- @After
- public void after() throws Exception {
- apiImpl.dropAndDestroy();
- }
-
- @Test
- public void testInputFormat() throws Exception {
-
-
- RyaStatement input = RyaStatement.builder()
- .setSubject(new RyaURI("http://www.google.com"))
- .setPredicate(new RyaURI("http://some_other_uri"))
- .setObject(new RyaURI("http://www.yahoo.com"))
- .setColumnVisibility(new byte[0])
- .setValue(new byte[0])
- .build();
-
- apiImpl.add(input);
-
- Job jobConf = Job.getInstance();
-
- RyaStatementInputFormat.setMockInstance(jobConf, instance.getInstanceName());
- RyaStatementInputFormat.setConnectorInfo(jobConf, username, password);
- RyaStatementInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
-
- AccumuloInputFormat.setInputTableName(jobConf, table);
- AccumuloInputFormat.setInputTableName(jobConf, table);
- AccumuloInputFormat.setScanIsolation(jobConf, false);
- AccumuloInputFormat.setLocalIterators(jobConf, false);
- AccumuloInputFormat.setOfflineTableScan(jobConf, false);
-
- RyaStatementInputFormat inputFormat = new RyaStatementInputFormat();
-
- JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
-
- List<InputSplit> splits = inputFormat.getSplits(context);
-
- Assert.assertEquals(1, splits.size());
-
- TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
-
- RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
-
- RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader;
- ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
-
- List<RyaStatement> results = new ArrayList<RyaStatement>();
- while(ryaStatementRecordReader.nextKeyValue()) {
- RyaStatementWritable writable = ryaStatementRecordReader.getCurrentValue();
- RyaStatement value = writable.getRyaStatement();
- Text text = ryaStatementRecordReader.getCurrentKey();
- RyaStatement stmt = RyaStatement.builder()
- .setSubject(value.getSubject())
- .setPredicate(value.getPredicate())
- .setObject(value.getObject())
- .setContext(value.getContext())
- .setQualifier(value.getQualifer())
- .setColumnVisibility(value.getColumnVisibility())
- .setValue(value.getValue())
- .build();
- results.add(stmt);
-
- System.out.println(text);
- System.out.println(value);
- }
-
- Assert.assertTrue(results.size() == 2);
- Assert.assertTrue(results.contains(input));
- }
-
- @Test
- public void mapperTest() throws Exception {
-
- RyaStatement input = RyaStatement.builder()
- .setSubject(new RyaURI("http://www.google.com"))
- .setPredicate(new RyaURI("http://some_other_uri"))
- .setObject(new RyaURI("http://www.yahoo.com"))
- .setValue(new byte[0])
- .setTimestamp(0L)
- .build();
-
- RyaStatementWritable writable = new RyaStatementWritable();
- writable.setRyaStatement(input);
-
- RyaStatementMapper mapper = new RyaStatementMapper();
- MapDriver<Text, RyaStatementWritable, Text, Mutation> mapDriver = MapDriver.newMapDriver(mapper);
-
- RyaTripleContext context = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
- RyaTableMutationsFactory mutationsFactory = new RyaTableMutationsFactory(context);
-
- Map<TABLE_LAYOUT, Collection<Mutation>> mutations = mutationsFactory.serialize(input);
-
- mapDriver.withInput(new Text("sometext"), writable);
-
- for(TABLE_LAYOUT key : mutations.keySet()) {
- Collection<Mutation> mutationCollection = mutations.get(key);
- for(Mutation m : mutationCollection) {
- mapDriver.withOutput(new Text("rya_" + key.name().toLowerCase()), m);
- }
- }
-
- mapDriver.runTest(false);
-
- }
-
- @Test
- public void reducerTest() throws Exception {
- RyaStatement input = RyaStatement.builder()
- .setSubject(new RyaURI("http://www.google.com"))
- .setPredicate(new RyaURI("http://some_other_uri"))
- .setObject(new RyaURI("http://www.yahoo.com"))
- .setValue(new byte[0])
- .setTimestamp(0L)
- .build();
-
- RyaStatementWritable writable = new RyaStatementWritable();
- writable.setRyaStatement(input);
-
- RyaStatementReducer reducer = new RyaStatementReducer();
- ReduceDriver<WritableComparable, RyaStatementWritable, Text, Mutation> reduceDriver = ReduceDriver.newReduceDriver(reducer);
-
- RyaTripleContext context = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
- RyaTableMutationsFactory mutationsFactory = new RyaTableMutationsFactory(context);
-
- Map<TABLE_LAYOUT, Collection<Mutation>> mutations = mutationsFactory.serialize(input);
-
- reduceDriver.withInput(new Text("sometext"), Arrays.asList(writable));
-
- for(TABLE_LAYOUT key : mutations.keySet()) {
- Collection<Mutation> mutationCollection = mutations.get(key);
- for(Mutation m : mutationCollection) {
- reduceDriver.withOutput(new Text("rya_" + key.name().toLowerCase()), m);
- }
- }
-
- reduceDriver.runTest(false);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java
deleted file mode 100644
index bda73e2..0000000
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java
+++ /dev/null
@@ -1,282 +0,0 @@
-package mvm.rya.accumulo.mr.eval;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.admin.SecurityOperations;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.hadoop.io.Text;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Created by IntelliJ IDEA.
- * Date: 4/24/12
- * Time: 5:05 PM
- * To change this template use File | Settings | File Templates.
- */
-@Ignore
-public class AccumuloRdfCountToolTest {
-
- private String user = "user";
- private String pwd = "pwd";
- private String instance = "myinstance";
- private String tablePrefix = "t_";
- private Authorizations auths = Constants.NO_AUTHS;
- private Connector connector;
-
- private AccumuloRyaDAO dao;
- private ValueFactory vf = new ValueFactoryImpl();
- private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- static String litdupsNS = "urn:test:litdups#";
-
- @Before
- public void setUp() throws Exception {
- connector = new MockInstance(instance).getConnector(user, pwd.getBytes());
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
- SecurityOperations secOps = connector.securityOperations();
- secOps.createUser(user, pwd.getBytes(), auths);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.WRITE);
-
- dao = new AccumuloRyaDAO();
- dao.setConnector(connector);
- conf.setTablePrefix(tablePrefix);
- dao.setConf(conf);
- dao.init();
- }
-
- @After
- public void tearDown() throws Exception {
- dao.destroy();
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
- }
-
- @Test
- public void testMR() throws Exception {
- RyaURI test1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "test1"));
- RyaURI pred1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "pred1"));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(0))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(1))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(2))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(3))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(4))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(5))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(6))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(7))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(8))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(9))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(10))));
-
- AccumuloRdfCountTool.main(new String[]{
- "-Dac.mock=true",
- "-Dac.instance=" + instance,
- "-Dac.username=" + user,
- "-Dac.pwd=" + pwd,
- "-Drdf.tablePrefix=" + tablePrefix,
- });
-
- Map<String, Key> expectedValues = new HashMap<String, Key>();
- String row = test1.getData();
- expectedValues.put(row,
- new Key(new Text(row),
- RdfCloudTripleStoreConstants.SUBJECT_CF_TXT,
- RdfCloudTripleStoreConstants.EMPTY_TEXT));
- row = pred1.getData();
- expectedValues.put(row,
- new Key(new Text(row),
- RdfCloudTripleStoreConstants.PRED_CF_TXT,
- RdfCloudTripleStoreConstants.EMPTY_TEXT));
- Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, auths);
- scanner.setRange(new Range());
- int count = 0;
- for (Map.Entry<Key, Value> entry : scanner) {
- assertTrue(expectedValues.get(entry.getKey().getRow().toString()).equals(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL));
- assertEquals(11, Long.parseLong(entry.getValue().toString()));
- count++;
- }
- assertEquals(2, count);
- }
-
-// public void testMRObject() throws Exception {
-// URI pred1 = vf.createURI(litdupsNS, "pred1");
-// Literal literal = vf.createLiteral(0);
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test0"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test1"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test2"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test3"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test4"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test5"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test6"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test7"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test8"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test9"), pred1, literal));
-// dao.add(new StatementImpl(vf.createURI(litdupsNS, "test10"), pred1, literal));
-// dao.commit();
-//
-// AccumuloRdfCountTool.main(new String[]{
-// "-Dac.mock=true",
-// "-Dac.instance=" + instance,
-// "-Dac.username=" + user,
-// "-Dac.pwd=" + pwd,
-// "-Drdf.tablePrefix=" + tablePrefix,
-// });
-//
-// Map<String, Key> expectedValues = new HashMap<String, Key>();
-// byte[] row_bytes = RdfCloudTripleStoreUtils.writeValue(literal);
-// expectedValues.put(new String(row_bytes),
-// new Key(new Text(row_bytes),
-// RdfCloudTripleStoreConstants.OBJ_CF_TXT,
-// RdfCloudTripleStoreConstants.INFO_TXT));
-// row_bytes = RdfCloudTripleStoreUtils.writeValue(pred1);
-// expectedValues.put(new String(row_bytes),
-// new Key(new Text(row_bytes),
-// RdfCloudTripleStoreConstants.PRED_CF_TXT,
-// RdfCloudTripleStoreConstants.INFO_TXT));
-// Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, auths);
-// scanner.setRange(new Range());
-// int count = 0;
-// for (Map.Entry<Key, Value> entry : scanner) {
-// assertTrue(expectedValues.get(entry.getKey().getRow().toString()).equals(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL));
-// assertEquals(11, Long.parseLong(entry.getValue().toString()));
-// count++;
-// }
-// assertEquals(2, count);
-// }
-
- @Test
- public void testTTL() throws Exception {
- RyaURI test1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "test1"));
- RyaURI pred1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "pred1"));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(0))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(1))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(2))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(3))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(4))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(5))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(6))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(7))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(8))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(9))));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(10))));
-
- AccumuloRdfCountTool.main(new String[]{
- "-Dac.mock=true",
- "-Dac.instance=" + instance,
- "-Dac.username=" + user,
- "-Dac.pwd=" + pwd,
- "-Dac.ttl=0",
- "-Drdf.tablePrefix=" + tablePrefix,
- });
-
- Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, auths);
- scanner.setRange(new Range());
- int count = 0;
- for (Map.Entry<Key, Value> entry : scanner) {
- count++;
- }
- assertEquals(0, count);
- }
-
- @Test
- public void testContext() throws Exception {
- RyaURI test1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "test1"));
- RyaURI pred1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "pred1"));
- RyaURI cntxt = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "cntxt"));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(0)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(1)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(2)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(3)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(4)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(5)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(6)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(7)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(8)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(9)), cntxt));
- dao.add(new RyaStatement(test1, pred1, RdfToRyaConversions.convertLiteral(vf.createLiteral(10)), cntxt));
-
- AccumuloRdfCountTool.main(new String[]{
- "-Dac.mock=true",
- "-Dac.instance=" + instance,
- "-Dac.username=" + user,
- "-Dac.pwd=" + pwd,
- "-Drdf.tablePrefix=" + tablePrefix,
- });
-
- Map<String, Key> expectedValues = new HashMap<String, Key>();
- String row = test1.getData();
- expectedValues.put(row,
- new Key(new Text(row),
- RdfCloudTripleStoreConstants.SUBJECT_CF_TXT,
- new Text(cntxt.getData())));
- row = pred1.getData();
- expectedValues.put(row,
- new Key(new Text(row),
- RdfCloudTripleStoreConstants.PRED_CF_TXT,
- new Text(cntxt.getData())));
- Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, auths);
- scanner.setRange(new Range());
- int count = 0;
- for (Map.Entry<Key, Value> entry : scanner) {
- assertTrue(expectedValues.get(entry.getKey().getRow().toString()).equals(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL));
- assertEquals(11, Long.parseLong(entry.getValue().toString()));
- count++;
- }
- assertEquals(2, count);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java
deleted file mode 100644
index 02b8357..0000000
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.util.Iterator;
-import java.util.Map;
-
-import junit.framework.TestCase;
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.admin.SecurityOperations;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.hadoop.io.Text;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.rio.RDFFormat;
-
-/**
- * Created by IntelliJ IDEA.
- * Date: 4/25/12
- * Time: 10:51 AM
- * To change this template use File | Settings | File Templates.
- */
-public class RdfFileInputToolTest extends TestCase {
-
- private String user = "user";
- private String pwd = "pwd";
- private String instance = "myinstance";
- private String tablePrefix = "t_";
- private Authorizations auths = Constants.NO_AUTHS;
- private Connector connector;
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
- connector = new MockInstance(instance).getConnector(user, pwd.getBytes());
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
- SecurityOperations secOps = connector.securityOperations();
- secOps.createUser(user, pwd.getBytes(), auths);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.WRITE);
- }
-
- @Override
- public void tearDown() throws Exception {
- super.tearDown();
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
- }
-
- public void testNTriplesInput() throws Exception {
- RdfFileInputTool.main(new String[]{
- "-Dac.mock=true",
- "-Dac.instance=" + instance,
- "-Dac.username=" + user,
- "-Dac.pwd=" + pwd,
- "-Drdf.tablePrefix=" + tablePrefix,
- "-Drdf.format=" + RDFFormat.NTRIPLES.getName(),
- "src/test/resources/test.ntriples",
- });
-
- Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, auths);
- scanner.setRange(new Range());
- Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
- ValueFactory vf = new ValueFactoryImpl();
- assertTrue(iterator.hasNext());
- RyaStatement rs = new RyaStatement(new RyaURI("urn:lubm:rdfts#GraduateStudent01"),
- new RyaURI("urn:lubm:rdfts#hasFriend"),
- new RyaURI("urn:lubm:rdfts#GraduateStudent02"));
- assertEquals(new Text(RyaTripleContext.getInstance(new AccumuloRdfConfiguration()).serializeTriple(rs).get(TABLE_LAYOUT.SPO).getRow()), iterator.next().getKey().getRow());
- }
-
- public void testInputContext() throws Exception {
- RdfFileInputTool.main(new String[]{
- "-Dac.mock=true",
- "-Dac.instance=" + instance,
- "-Dac.username=" + user,
- "-Dac.pwd=" + pwd,
- "-Drdf.tablePrefix=" + tablePrefix,
- "-Drdf.format=" + RDFFormat.TRIG.getName(),
- "src/test/resources/namedgraphs.trig",
- });
-
- Scanner scanner = connector.createScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, auths);
- scanner.setRange(new Range());
- Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
- ValueFactory vf = new ValueFactoryImpl();
- assertTrue(iterator.hasNext());
- RyaStatement rs = new RyaStatement(new RyaURI("http://www.example.org/exampleDocument#Monica"),
- new RyaURI("http://www.example.org/vocabulary#name"),
- new RyaType("Monica Murphy"),
- new RyaURI("http://www.example.org/exampleDocument#G1"));
- Key key = iterator.next().getKey();
-
- TripleRow tripleRow = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()).serializeTriple(rs).get(TABLE_LAYOUT.SPO);
- assertEquals(new Text(tripleRow.getRow()), key.getRow());
- assertEquals(new Text(tripleRow.getColumnFamily()), key.getColumnFamily());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java
deleted file mode 100644
index 5ac2d74..0000000
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java
+++ /dev/null
@@ -1,319 +0,0 @@
-package mvm.rya.accumulo.mr.upgrade;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import junit.framework.TestCase;
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.accumulo.query.AccumuloRyaQueryEngine;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.api.persist.query.RyaQuery;
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.*;
-import org.apache.accumulo.core.client.admin.SecurityOperations;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.calrissian.mango.collect.CloseableIterable;
-import org.openrdf.model.vocabulary.XMLSchema;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * Created by IntelliJ IDEA.
- * Date: 4/25/12
- * Time: 10:51 AM
- * To change this template use File | Settings | File Templates.
- */
-public class Upgrade322ToolTest extends TestCase {
-
- private String user = "user";
- private String pwd = "pwd";
- private String instance = "myinstance";
- private String tablePrefix = "t_";
- private Authorizations auths = Constants.NO_AUTHS;
- private Connector connector;
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
-
- final String spoTable = tablePrefix +
- RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX;
- final String poTable = tablePrefix +
- RdfCloudTripleStoreConstants.TBL_PO_SUFFIX;
- final String ospTable = tablePrefix +
- RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX;
-
- connector = new MockInstance(instance).getConnector(user, pwd.getBytes());
-
- connector.tableOperations().create(spoTable);
- connector.tableOperations().create(poTable);
- connector.tableOperations().create(ospTable);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
- connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
- SecurityOperations secOps = connector.securityOperations();
- secOps.createUser(user, pwd.getBytes(), auths);
- secOps.grantTablePermission(user, spoTable, TablePermission.READ);
- secOps.grantTablePermission(user, poTable, TablePermission.READ);
- secOps.grantTablePermission(user, ospTable, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.READ);
- secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.WRITE);
-
- //load data
- final BatchWriter ospWriter = connector
- .createBatchWriter(ospTable, new BatchWriterConfig());
- ospWriter.addMutation(getMutation("00000000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010/tracked-data-provenance/ns#longLit\u0001\u0004"));
- ospWriter.addMutation(getMutation("00000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#intLit\u0001\u0005"));
- ospWriter.addMutation(getMutation("00000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#byteLit\u0001\t"));
- ospWriter.addMutation(getMutation("00001 1.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#doubleLit\u0001\u0006"));
- ospWriter.addMutation(getMutation("10\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0000http" +
- "://here/2010/tracked-data-provenance/ns#shortLit\u0001http://www.w3" +
- ".org/2001/XMLSchema#short\u0001\b"));
- ospWriter.addMutation(getMutation("10.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#floatLit\u0001http" +
- "://www.w3.org/2001/XMLSchema#float\u0001\b"));
- ospWriter.addMutation(getMutation("3.0.0\u0000urn:mvm.rya/2012/05#rts\u0000urn:mvm" +
- ".rya/2012/05#version\u0001\u0003"));
- ospWriter.addMutation(getMutation("9223370726404375807\u0000http://here/2010/tracked-data-provenance/ns" +
- "#uuid10\u0000http://here/2010/tracked-data-provenance/ns#dateLit" +
- "\u0001\u0007"));
- ospWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#Created\u0000http://here" +
- "/2010/tracked-data-provenance/ns#uuid10\u0000http://www.w3" +
- ".org/1999/02/22-rdf-syntax-ns#type\u0001\u0002"));
- ospWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#objectuuid1\u0000http" +
- "://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010" +
- "/tracked-data-provenance/ns#uriLit\u0001\u0002"));
- ospWriter.addMutation(getMutation("stringLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#stringLit\u0001" +
- "\u0003"));
- ospWriter.addMutation(getMutation("true\u0000http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#booleanLit\u0001\n"));
- ospWriter.flush();
- ospWriter.close();
-
- final BatchWriter spoWriter = connector
- .createBatchWriter(spoTable, new BatchWriterConfig());
- spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010/tracked-data-provenance/ns#longLit\u000000000000000000000010\u0001\u0004"));
- spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#intLit\u000000000000010\u0001\u0005"));
- spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#byteLit\u000000000010\u0001\t"));
- spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#doubleLit\u000000001 1.0\u0001\u0006"));
- spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10\u0000http" +
- "://here/2010/tracked-data-provenance/ns#shortLit\u000010\u0001http://www.w3" +
- ".org/2001/XMLSchema#short\u0001\b"));
- spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#floatLit\u0001http" +
- "://www.w3.org/2001/XMLSchema#float\u000010.0\u0001\b"));
- spoWriter.addMutation(getMutation("urn:mvm.rya/2012/05#rts\u0000urn:mvm" +
- ".rya/2012/05#version\u00003.0.0\u0001\u0003"));
- spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns" +
- "#uuid10\u0000http://here/2010/tracked-data-provenance/ns#dateLit" +
- "\u00009223370726404375807\u0001\u0007"));
- spoWriter.addMutation(getMutation("http://here" +
- "/2010/tracked-data-provenance/ns#uuid10\u0000http://www.w3" +
- ".org/1999/02/22-rdf-syntax-ns#type\u0000http://here/2010/tracked-data-provenance/ns#Created\u0001\u0002"));
- spoWriter.addMutation(getMutation("http" +
- "://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010" +
- "/tracked-data-provenance/ns#uriLit\u0000http://here/2010/tracked-data-provenance/ns#objectuuid1\u0001\u0002"));
- spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#stringLit\u0000stringLit\u0001" +
- "\u0003"));
- spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" +
- "\u0000http://here/2010/tracked-data-provenance/ns#booleanLit\u0000true\u0001\n"));
- spoWriter.flush();
- spoWriter.close();
-
- final BatchWriter poWriter = connector
- .createBatchWriter(poTable, new BatchWriterConfig());
- poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#longLit\u000000000000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0004"));
- poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#intLit\u000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0005"));
- poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#byteLit\u000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\t"));
- poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#doubleLit\u000000001 1.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0006"));
- poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#shortLit\u000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001http://www.w3" +
- ".org/2001/XMLSchema#short\u0001\b"));
- poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#floatLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001http" +
- "://www.w3.org/2001/XMLSchema#float\u000010.0\u0001\b"));
- poWriter.addMutation(getMutation("urn:mvm" +
- ".rya/2012/05#version\u00003.0.0\u0000urn:mvm.rya/2012/05#rts\u0001\u0003"));
- poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#dateLit" +
- "\u00009223370726404375807\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0007"));
- poWriter.addMutation(getMutation("http://www.w3" +
- ".org/1999/02/22-rdf-syntax-ns#type\u0000http://here/2010/tracked-data-provenance/ns#Created\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0002"));
- poWriter.addMutation(getMutation("http://here/2010" +
- "/tracked-data-provenance/ns#uriLit\u0000http://here/2010/tracked-data-provenance/ns#objectuuid1\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0002"));
- poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#stringLit\u0000stringLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001" +
- "\u0003"));
- poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#booleanLit\u0000true\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\n"));
- poWriter.flush();
- poWriter.close();
- }
-
- public Mutation getMutation(String row) {
- final Mutation mutation = new Mutation(row);
- mutation.put("", "", "");
- return mutation;
- }
-
- @Override
- public void tearDown() throws Exception {
- super.tearDown();
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- connector.tableOperations().delete(
- tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
- connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
- connector.tableOperations().delete(
- tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
- }
-
- public void testUpgrade() throws Exception {
- Upgrade322Tool.main(new String[]{
- "-Dac.mock=true",
- "-Dac.instance=" + instance,
- "-Dac.username=" + user,
- "-Dac.pwd=" + pwd,
- "-Drdf.tablePrefix=" + tablePrefix,
- });
-
- final AccumuloRdfConfiguration configuration = new AccumuloRdfConfiguration();
- configuration.setTablePrefix(tablePrefix);
- final AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO();
- ryaDAO.setConnector(connector);
- ryaDAO.setConf(configuration);
- ryaDAO.init();
-
- final AccumuloRyaQueryEngine queryEngine = ryaDAO.getQueryEngine();
-
- verify(new RyaStatement(
- new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"),
- new RyaURI("http://here/2010/tracked-data-provenance/ns#booleanLit"),
- new RyaType(XMLSchema.BOOLEAN, "true")), queryEngine);
- verify(new RyaStatement(
- new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"),
- new RyaURI("http://here/2010/tracked-data-provenance/ns#longLit"),
- new RyaType(XMLSchema.LONG, "10")), queryEngine);
- verify(new RyaStatement(
- new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"),
- new RyaURI("http://here/2010/tracked-data-provenance/ns#intLit"),
- new RyaType(XMLSchema.INTEGER, "10")), queryEngine);
- verify(new RyaStatement(
- new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"),
- new RyaURI("http://here/2010/tracked-data-provenance/ns#byteLit"),
- new RyaType(XMLSchema.BYTE, "10")), queryEngine);
- verify(new RyaStatement(
- new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"),
- new RyaURI("http://here/2010/tracked-data-provenance/ns#doubleLit"),
- new RyaType(XMLSchema.DOUBLE, "10.0")), queryEngine);
- verify(new RyaStatement(
- new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"),
- new RyaURI("http://here/2010/tracked-data-provenance/ns#dateLit"),
- new RyaType(XMLSchema.DATETIME, "2011-07-12T06:00:00.000Z")), queryEngine);
- verify(new RyaStatement(
- new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"),
- new RyaURI("http://here/2010/tracked-data-provenance/ns#stringLit"),
- new RyaType("stringLit")), queryEngine);
- verify(new RyaStatement(
- new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"),
- new RyaURI("http://here/2010/tracked-data-provenance/ns#uriLit"),
- new RyaURI("http://here/2010/tracked-data-provenance/ns" +
- "#objectuuid1")), queryEngine);
- verify(new RyaStatement(
- new RyaURI("urn:mvm.rya/2012/05#rts"),
- new RyaURI("urn:mvm.rya/2012/05#version"),
- new RyaType("3.0.0")), queryEngine);
- }
-
- private void verify(RyaStatement ryaStatement, AccumuloRyaQueryEngine queryEngine)
- throws RyaDAOException, IOException {
-
- //check osp
- CloseableIterable<RyaStatement> statements =
- queryEngine.query(RyaQuery.builder(new RyaStatement(null, null, ryaStatement.getObject()))
- .build());
- try {
- verifyFirstStatement(ryaStatement, statements);
- } finally {
- statements.close();
- }
-
- //check po
- statements = queryEngine.query(RyaQuery.builder(
- new RyaStatement(null, ryaStatement.getPredicate(),
- ryaStatement.getObject())).build());
- try {
- verifyFirstStatement(ryaStatement, statements);
- } finally {
- statements.close();
- }
-
- //check spo
- statements = queryEngine.query(RyaQuery.builder(
- new RyaStatement(ryaStatement.getSubject(),
- ryaStatement.getPredicate(),
- ryaStatement.getObject())).build());
- try {
- verifyFirstStatement(ryaStatement, statements);
- } finally {
- statements.close();
- }
- }
-
- private void verifyFirstStatement(
- RyaStatement ryaStatement, CloseableIterable<RyaStatement> statements) {
- final Iterator<RyaStatement> iterator = statements.iterator();
- assertTrue(iterator.hasNext());
- final RyaStatement first = iterator.next();
- assertEquals(ryaStatement.getSubject(), first.getSubject());
- assertEquals(ryaStatement.getPredicate(), first.getPredicate());
- assertEquals(ryaStatement.getObject(), first.getObject());
- assertFalse(iterator.hasNext());
- }
-
- public void printTableData(String tableName)
- throws TableNotFoundException{
- Scanner scanner = connector.createScanner(tableName, auths);
- scanner.setRange(new Range());
- for(Map.Entry<Key, Value> entry : scanner) {
- final Key key = entry.getKey();
- final Value value = entry.getValue();
- System.out.println(key.getRow() + " " + key.getColumnFamily() + " " + key.getColumnQualifier() + " " + key.getTimestamp() + " " + value.toString());
- }
- }
-
-}