You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by pu...@apache.org on 2015/12/04 17:46:35 UTC
[23/49] incubator-rya git commit: RYA-7 POM and License Clean-up for
Apache Move
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java
deleted file mode 100644
index fea882d..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/BulkNtripsInputTool.java
+++ /dev/null
@@ -1,326 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput.bulk;
-
-import cloudbase.core.client.Connector;
-import cloudbase.core.client.Instance;
-import cloudbase.core.client.ZooKeeperInstance;
-import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat;
-import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Value;
-import cloudbase.core.util.TextUtil;
-import com.google.common.base.Preconditions;
-import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner;
-import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.ntriples.NTriplesParserFactory;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.util.Collection;
-
-import static mvm.mmrts.rdf.partition.PartitionConstants.*;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
-
-/**
- * Take large ntrips files and use MapReduce and Cloudbase
- * Bulk ingest techniques to load into the table in our partition format.
- * <p/>
- * Input: NTrips file
- * Map:
- * - key : shard row - Text
- * - value : stmt in doc triple format - Text
- * Partitioner: RangePartitioner
- * Reduce:
- * - key : all the entries for each triple - Cloudbase Key
- * Class BulkNtripsInputTool
- * Date: Sep 13, 2011
- * Time: 10:00:17 AM
- */
-public class BulkNtripsInputTool extends Configured implements Tool {
-
- private static DateHashModShardValueGenerator generator = new DateHashModShardValueGenerator();
- public static final String BASE_MOD = "baseMod";
-
- @Override
- public int run(String[] args) throws Exception {
- Preconditions.checkArgument(args.length >= 7, "Usage: hadoop jar jarfile BulkNtripsInputTool <cb instance>" +
- " <zookeepers> <username> <password> <output table> <hdfs ntrips dir> <work dir> (<shard size>)");
-
- Configuration conf = getConf();
- PrintStream out = null;
- try {
- Job job = new Job(conf, "Bulk Ingest NTrips to Partition RDF");
- job.setJarByClass(this.getClass());
-
- //setting long job
- job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
- job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
- job.getConfiguration().set("io.sort.mb", "256");
-
- job.setInputFormatClass(TextInputFormat.class);
-
- job.setMapperClass(ParseNtripsMapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
-
- job.setCombinerClass(OutStmtMutationsReducer.class);
- job.setReducerClass(OutStmtMutationsReducer.class);
- job.setOutputFormatClass(CloudbaseFileOutputFormat.class);
- CloudbaseFileOutputFormat.setZooKeeperInstance(job, args[0], args[1]);
-
- Instance instance = new ZooKeeperInstance(args[0], args[1]);
- String user = args[2];
- byte[] pass = args[3].getBytes();
- String tableName = args[4];
- String inputDir = args[5];
- String workDir = args[6];
- if(args.length > 7) {
- int baseMod = Integer.parseInt(args[7]);
- generator.setBaseMod(baseMod);
- job.getConfiguration().setInt(BASE_MOD, baseMod);
- }
-
- Connector connector = instance.getConnector(user, pass);
-
- TextInputFormat.setInputPaths(job, new Path(inputDir));
-
- FileSystem fs = FileSystem.get(conf);
- Path workPath = new Path(workDir);
- if (fs.exists(workPath))
- fs.delete(workPath, true);
-
- CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files"));
-
- out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
-
- Collection<Text> splits = connector.tableOperations().getSplits(tableName, Integer.MAX_VALUE);
- for (Text split : splits)
- out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split))));
-
- job.setNumReduceTasks(splits.size() + 1);
- out.close();
-
- job.setPartitionerClass(KeyRangePartitioner.class);
- RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
-
- job.waitForCompletion(true);
-
- connector.tableOperations().importDirectory(
- tableName,
- workDir + "/files",
- workDir + "/failures",
- 20,
- 4,
- false);
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- if (out != null)
- out.close();
- }
-
- return 0;
- }
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * input: ntrips format triple
- * <p/>
- * output: key: shard row from generator
- * value: stmt in serialized format (document format)
- */
- public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> {
- private static final NTriplesParserFactory N_TRIPLES_PARSER_FACTORY = new NTriplesParserFactory();
-
- private Text outputKey = new Text();
- private Text outputValue = new Text();
- private RDFParser parser;
- private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
-
- @Override
- protected void setup(final Context context) throws IOException, InterruptedException {
- super.setup(context);
- Configuration conf = context.getConfiguration();
- generator.setBaseMod(conf.getInt(BASE_MOD, generator.getBaseMod()));
- parser = N_TRIPLES_PARSER_FACTORY.getParser();
- parser.setRDFHandler(new RDFHandler() {
-
- @Override
- public void startRDF() throws RDFHandlerException {
-
- }
-
- @Override
- public void endRDF() throws RDFHandlerException {
-
- }
-
- @Override
- public void handleNamespace(String s, String s1) throws RDFHandlerException {
-
- }
-
- @Override
- public void handleStatement(Statement statement) throws RDFHandlerException {
- try {
-// byte[] doc_serialized = writeStatement(statement, true);
- Text shard = new Text(generator.generateShardValue(statement.getSubject()));
-
- context.write(new Key(shard, DOC, new Text(writeStatement(statement, true))), EMPTY_VALUE);
- context.write(new Key(shard, INDEX, new Text(writeStatement(statement, false))), EMPTY_VALUE);
- //TODO: Wish we didn't have to do this constantly, probably better to just aggregate all subjects and do it once
- context.write(new Key(new Text(writeValue(statement.getSubject())), shard, EMPTY_TXT), EMPTY_VALUE);
-
-// outputKey.set(key);
-// outputValue.set(doc_serialized);
-// context.write(outputKey, outputValue);
-// outputKey.set(writeValue(statement.getSubject()));
-// outputValue.set(EMPTY_BYTE_ARRAY);
-// context.write(outputKey, outputValue);
- } catch (Exception e) {
- throw new RDFHandlerException(e);
- }
- }
-
- @Override
- public void handleComment(String s) throws RDFHandlerException {
-
- }
- });
- }
-
- @Override
- public void map(LongWritable key, Text value, Context output)
- throws IOException, InterruptedException {
- try {
- parser.parse(new StringReader(value.toString()), "");
- } catch (Exception e) {
- throw new IOException("Exception occurred parsing ntrips triple[" + value + "]");
- }
- }
- }
-
- public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> {
-
- public void reduce(Key key, Iterable<Value> values, Context output)
- throws IOException, InterruptedException {
- output.write(key, EMPTY_VALUE);
-// System.out.println(key);
-// for (Value value : values) {
-// System.out.println(value);
- /**
- * Each of these is a triple.
- * 1. format back to statement
- * 2. Output the doc,index key,value pairs for each triple
- */
-// Statement stmt = readStatement(ByteStreams.newDataInput(value.getBytes()), VALUE_FACTORY);
-// output.write(new Key(shardKey, DOC, new Text(writeStatement(stmt, true))), EMPTY_VALUE);
-// output.write(new Key(shardKey, INDEX, new Text(writeStatement(stmt, false))), EMPTY_VALUE);
-// //TODO: Wish we didn't have to do this constantly, probably better to just aggregate all subjects and do it once
-// output.write(new Key(new Text(writeValue(stmt.getSubject())), shardKey, EMPTY_TXT), EMPTY_VALUE);
-// }
- }
- }
-
- public static class EmbedKeyGroupingComparator implements RawComparator<Text> {
-
- public EmbedKeyGroupingComparator() {
-
- }
-
- @Override
- public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
- int arg5) {
- DataInputBuffer n = new DataInputBuffer();
-
- Text temp1 = new Text();
- Text temp2 = new Text();
-
- try {
- n.reset(arg0, arg1, arg2);
- temp1.readFields(n);
- n.reset(arg3, arg4, arg5);
- temp2.readFields(n);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- //e.printStackTrace();
- throw new RuntimeException(e);
- }
-
- return compare(temp1, temp2);
- }
-
- @Override
- public int compare(Text a1, Text a2) {
- return EmbedKeyRangePartitioner.retrieveEmbedKey(a1).compareTo(EmbedKeyRangePartitioner.retrieveEmbedKey(a2));
- }
-
- }
-
- /**
- * Really it does a normal Text compare
- */
- public static class EmbedKeySortComparator implements RawComparator<Text> {
-
- public EmbedKeySortComparator() {
-
- }
-
- @Override
- public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
- int arg5) {
- DataInputBuffer n = new DataInputBuffer();
-
- Text temp1 = new Text();
- Text temp2 = new Text();
-
- try {
- n.reset(arg0, arg1, arg2);
- temp1.readFields(n);
- n.reset(arg3, arg4, arg5);
- temp2.readFields(n);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- //e.printStackTrace();
- throw new RuntimeException(e);
- }
-
- return compare(temp1, temp2);
- }
-
- @Override
- public int compare(Text a1, Text a2) {
- return a1.compareTo(a2);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java
deleted file mode 100644
index f72c382..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitioner.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput.bulk;
-
-import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
-import mvm.mmrts.rdf.partition.PartitionConstants;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Class EmbedKeyRangePartitioner
- * Date: Sep 13, 2011
- * Time: 1:49:35 PM
- */
-public class EmbedKeyRangePartitioner extends RangePartitioner {
- @Override
- public int getPartition(Text key, Writable value, int numPartitions) {
- Text embedKey = retrieveEmbedKey(key);
- return super.getPartition(embedKey, value, numPartitions);
- }
-
- public static Text retrieveEmbedKey(Text key) {
- int split = key.find(PartitionConstants.INDEX_DELIM_STR);
- if (split < 0)
- return key;
- Text newText = new Text();
- newText.append(key.getBytes(), 0, split);
- return newText;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java
deleted file mode 100644
index a83d594..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/iterators/SortedEncodedRangeIterator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.iterators;
-
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Value;
-import cloudbase.core.iterators.IteratorEnvironment;
-import cloudbase.core.iterators.SortedKeyValueIterator;
-import cloudbase.core.util.TextUtil;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.io.Text;
-import ss.cloudbase.core.iterators.SortedRangeIterator;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Class SortedEncodedRangeIterator
- * Date: Sep 8, 2011
- * Time: 6:01:28 PM
- */
-public class SortedEncodedRangeIterator extends SortedRangeIterator {
-
- @Override
- public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
- if (options.containsKey(OPTION_LOWER_BOUND)) {
- lower = new Text(decode(options.get(OPTION_LOWER_BOUND)));
- } else {
- lower = new Text("\u0000");
- }
-
- if (options.containsKey(OPTION_UPPER_BOUND)) {
- upper = new Text(decode(options.get(OPTION_UPPER_BOUND)));
- } else {
- upper = new Text("\u0000");
- }
- }
-
- public static String encode(String txt) {
- return new String(Base64.encodeBase64(txt.getBytes()));
- }
-
- public static String decode(String txt) {
- return new String(Base64.decodeBase64(txt.getBytes()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java
deleted file mode 100644
index e360ca7..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectCombiner.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.transform;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Since each subject is located at most on one tablet, we should be able to assume that
- * no reducer is needed. The Combine phase should aggregate properly.
- * <p/>
- * Class AggregateTriplesBySubjectReducer
- * Date: Sep 1, 2011
- * Time: 5:39:24 PM
- */
-public class AggregateTriplesBySubjectCombiner extends Reducer<Text, MapWritable, Text, MapWritable> {
-// private LongWritable lwout = new LongWritable();
- private MapWritable mwout = new MapWritable();
-
- @Override
- protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
- for (MapWritable value : values) {
- for (Map.Entry<Writable, Writable> entry : value.entrySet()) {
- mwout.put(WritableUtils.clone(entry.getKey(), context.getConfiguration()),
- WritableUtils.clone(entry.getValue(), context.getConfiguration()));
- }
- }
- context.write(key, mwout);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java
deleted file mode 100644
index 2ea5fa8..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/AggregateTriplesBySubjectReducer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.transform;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import static mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.SELECT_FILTER;
-
-/**
- * Since each subject is located at most on one tablet, we should be able to assume that
- * no reducer is needed. The Combine phase should aggregate properly.
- * <p/>
- * Class AggregateTriplesBySubjectReducer
- * Date: Sep 1, 2011
- * Time: 5:39:24 PM
- */
-public class AggregateTriplesBySubjectReducer extends Reducer<Text, MapWritable, LongWritable, MapWritable> {
- private LongWritable lwout = new LongWritable();
- private MapWritable mwout = new MapWritable();
-
- @Override
- protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
- for (MapWritable value : values) {
- for (Map.Entry<Writable, Writable> entry : value.entrySet()) {
- mwout.put(WritableUtils.clone(entry.getKey(), context.getConfiguration()),
- WritableUtils.clone(entry.getValue(), context.getConfiguration()));
- }
- }
- lwout.set(key.hashCode());
- context.write(lwout, mwout);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java
deleted file mode 100644
index 0630501..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/KeyValueToMapWrMapper.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.transform;
-
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Value;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import mvm.mmrts.rdf.partition.PartitionConstants;
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.openrdf.model.Statement;
-
-import java.io.IOException;
-import java.util.*;
-
-import static mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.*;
-
-/**
- * Will take a triple and output: <subject, predObj map>
- * <p/>
- * Class KeyValueToMapWrMapper
- * Date: Sep 1, 2011
- * Time: 4:56:42 PM
- */
-public class KeyValueToMapWrMapper extends Mapper<Key, Value, Text, MapWritable> {
-
-// private List<String> predicateFilter = new ArrayList<String>();
-
- private Text subjNameTxt;
- private Text keyout = new Text();
- private Text predout = new Text();
- private Text objout = new Text();
-
- private Map<String, String> predValueName = new HashMap();
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- //find the values to filter on
- Configuration conf = context.getConfiguration();
- String[] filter = conf.getStrings(SELECT_FILTER);
- if (filter != null) {
- for (String predValue : filter) {
- String predName = conf.get(predValue);
- if (predName != null)
- predValueName.put(predValue, predName);
- }
- }
-
- String subjName = conf.get(SUBJECT_NAME);
- if (subjName != null) {
- //not sure it will ever be null
- subjNameTxt = new Text(subjName);
- }
- }
-
- @Override
- protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
- Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(key.getColumnQualifier().getBytes()), PartitionConstants.VALUE_FACTORY);
- String predName = predValueName.get(stmt.getPredicate().stringValue());
- if (predName == null)
- return;
-
- keyout.set(stmt.getSubject().stringValue());
- predout.set(predName);
- objout.set(stmt.getObject().stringValue());
- MapWritable mw = new MapWritable();
- mw.put(predout, objout);
- if (subjNameTxt != null) {
- mw.put(subjNameTxt, keyout);
- }
- context.write(keyout, mw);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java
deleted file mode 100644
index 56014f9..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFJob.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.transform;
-
-import cloudbase.core.util.ArgumentChecker;
-import mvm.mmrts.rdf.partition.query.evaluation.FilterTimeIndexVisitor;
-import mvm.mmrts.rdf.partition.query.evaluation.SubjectGroupingOptimizer;
-import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.QueryModelNode;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-import org.openrdf.query.parser.QueryParser;
-import org.openrdf.query.parser.sparql.SPARQLParserFactory;
-
-/**
- * Class SparqlCloudbaseIFJob
- * Date: Sep 1, 2011
- * Time: 6:04:35 PM
- */
-public class SparqlCloudbaseIFJob {
-
- private String[] queries;
- private String table;
-
- //Cloudbase properties
- private String userName;
- private String pwd;
- private String instance;
- private String zk;
- //
-
- private Class classOriginal; //Calling class for this job.
- private String outputPath;
-
- public SparqlCloudbaseIFJob(String table, String userName, String pwd, String instance, String zk,
- String outputPath, Class classOriginal, String... queries) {
- ArgumentChecker.notNull(queries);
- this.queries = queries;
- this.table = table;
- this.userName = userName;
- this.pwd = pwd;
- this.instance = instance;
- this.zk = zk;
- this.outputPath = outputPath;
- this.classOriginal = classOriginal;
- }
-
- public String[] run() throws Exception {
- int count = 0;
- outputPath = outputPath + "/results/";
- String[] resultsOut = new String[queries.length];
-
- for (String query : queries) {
- QueryParser parser = (new SPARQLParserFactory()).getParser();
- TupleExpr expr = parser.parseQuery(query, "http://www.w3.org/1999/02/22-rdf-syntax-ns#").getTupleExpr();
-
- final Configuration queryConf = new Configuration();
- expr.visit(new FilterTimeIndexVisitor(queryConf));
-
- (new SubjectGroupingOptimizer(queryConf)).optimize(expr, null, null);
-
- //make sure of only one shardlookup
- expr.visit(new QueryModelVisitorBase<RuntimeException>() {
- int count = 0;
-
- @Override
- public void meetOther(QueryModelNode node) throws RuntimeException {
- super.meetOther(node);
- count++;
- if (count > 1)
- throw new IllegalArgumentException("Query can only have one subject-star lookup");
- }
- });
-
- final Job job = new Job(queryConf);
- job.setJarByClass(classOriginal);
- job.setJobName("SparqlCloudbaseIFTransformer. Query: " + ((query.length() > 32) ? (query.substring(0, 32)) : (query)));
-
- expr.visit(new QueryModelVisitorBase<RuntimeException>() {
- @Override
- public void meetOther(QueryModelNode node) throws RuntimeException {
- super.meetOther(node);
-
- //set up CloudbaseBatchScannerInputFormat here
- if (node instanceof ShardSubjectLookup) {
- System.out.println("Lookup: " + node);
- try {
- new SparqlCloudbaseIFTransformer((ShardSubjectLookup) node, queryConf, job, table,
- userName, pwd, instance, zk);
- } catch (QueryEvaluationException e) {
- e.printStackTrace();
- }
- }
- }
- });
-
-
- String resultOutPath = outputPath + "/result-" + count;
- resultsOut[count] = resultOutPath;
- Path outputDir = new Path(resultOutPath);
- FileSystem dfs = FileSystem.get(outputDir.toUri(), queryConf);
- if (dfs.exists(outputDir))
- dfs.delete(outputDir, true);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- SequenceFileOutputFormat.setOutputPath(job, outputDir);
-
-
- // Submit the job
- job.waitForCompletion(true);
- count++;
- }
- return resultsOut;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java
deleted file mode 100644
index 38c9ea5..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformer.java
+++ /dev/null
@@ -1,331 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.transform;
-
-import cloudbase.core.CBConstants;
-import cloudbase.core.client.TableNotFoundException;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Range;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import mvm.rya.cloudbase.utils.input.CloudbaseBatchScannerInputFormat;
-import mvm.mmrts.rdf.partition.mr.iterators.SortedEncodedRangeIterator;
-import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
-import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.Var;
-import ss.cloudbase.core.iterators.GMDenIntersectingIterator;
-import ss.cloudbase.core.iterators.SortedRangeIterator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static mvm.mmrts.rdf.partition.PartitionConstants.*;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
-
-import static mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFTransformerConstants.*;
-
-/**
- * Class SparqlCloudbaseIFTransformer
- * Date: Sep 1, 2011
- * Time: 11:28:48 AM
- */
-public class SparqlCloudbaseIFTransformer {
-
- protected Job job;
-
- protected String userName;
- protected String pwd;
- protected String instance;
- protected String zk;
-
- protected ShardSubjectLookup lookup;
-// protected Configuration configuration;
- protected String table;
-
- protected DateHashModShardValueGenerator generator;
-
- public SparqlCloudbaseIFTransformer(ShardSubjectLookup lookup, Configuration configuration, Job job, String table,
- String userName, String pwd, String instance, String zk) throws QueryEvaluationException {
- this(lookup, configuration, job, table, userName, pwd, instance, zk, new DateHashModShardValueGenerator());
- }
-
- public SparqlCloudbaseIFTransformer(ShardSubjectLookup lookup, Configuration configuration, Job job, String table,
- String userName, String pwd, String instance, String zk, DateHashModShardValueGenerator generator) throws QueryEvaluationException {
- this.lookup = lookup;
-// this.configuration = configuration;
- this.table = table;
- this.job = job;
- this.userName = userName;
- this.pwd = pwd;
- this.instance = instance;
- this.zk = zk;
- this.generator = generator;
-
- this.initialize();
- }
-
-
- public void initialize() throws QueryEvaluationException {
- try {
- /**
- * Here we will set up the BatchScanner based on the lookup
- */
- Var subject = lookup.getSubject();
- List<Map.Entry<Var, Var>> where = retrieveWhereClause();
- List<Map.Entry<Var, Var>> select = retrieveSelectClause();
-
- //global start-end time
- long start = job.getConfiguration().getLong(START_BINDING, 0);
- long end = job.getConfiguration().getLong(END_BINDING, System.currentTimeMillis());
-
- int whereSize = where.size() + ((!isTimeRange(lookup, job.getConfiguration())) ? 0 : 1);
-
- if (subject.hasValue()
- && where.size() == 0 /* Not using whereSize, because we can set up the TimeRange in the scanner */
- && select.size() == 0) {
- /**
- * Case 1: Subject is set, but predicate, object are not.
- * Return all for the subject
- */
-// this.scanner = scannerForSubject((URI) subject.getValue());
-// if (this.scanner == null) {
-// this.iter = new EmptyIteration();
-// return;
-// }
-// Map.Entry<Var, Var> predObj = lookup.getPredicateObjectPairs().get(0);
-// this.iter = new SelectAllIterator(this.bindings, this.scanner.iterator(), predObj.getKey(), predObj.getValue());
- throw new UnsupportedOperationException("Query Case not supported");
- } else if (subject.hasValue()
- && where.size() == 0 /* Not using whereSize, because we can set up the TimeRange in the scanner */) {
- /**
- * Case 2: Subject is set, and a few predicates are set, but no objects
- * Return all, and filter which predicates you are interested in
- */
-// this.scanner = scannerForSubject((URI) subject.getValue());
-// if (this.scanner == null) {
-// this.iter = new EmptyIteration();
-// return;
-// }
-// this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select);
- throw new UnsupportedOperationException("Query Case not supported");
- } else if (subject.hasValue()
- && where.size() >= 1 /* Not using whereSize, because we can set up the TimeRange in the scanner */) {
- /**
- * Case 2a: Subject is set, and a few predicates are set, and one object
- * TODO: For now we will ignore the predicate-object filter because we do not know how to query for this
- */
-// this.scanner = scannerForSubject((URI) subject.getValue());
-// if (this.scanner == null) {
-// this.iter = new EmptyIteration();
-// return;
-// }
-// this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select);
- throw new UnsupportedOperationException("Query Case not supported");
- } else if (!subject.hasValue() && whereSize > 1) {
- /**
- * Case 3: Subject is not set, more than one where clause
- */
- scannerForPredicateObject(lookup, start, end, where);
- setSelectFilter(subject, select);
- } else if (!subject.hasValue() && whereSize == 1) {
- /**
- * Case 4: No subject, only one where clause
- */
- Map.Entry<Var, Var> predObj = null;
- if (where.size() == 1) {
- predObj = where.get(0);
- }
- scannerForPredicateObject(lookup, start, end, predObj);
- setSelectFilter(subject, select);
- } else if (!subject.hasValue() && whereSize == 0 && select.size() > 1) {
- /**
- * Case 5: No subject, no where (just 1 select)
- */
-// this.scanner = scannerForPredicates(start, end, select);
-// if (this.scanner == null) {
-// this.iter = new EmptyIteration();
-// return;
-// }
-// this.iter = new FilterIterator(this.bindings, this.scanner.iterator(), subject, select);
- throw new UnsupportedOperationException("Query Case not supported");
- } else if (!subject.hasValue() && whereSize == 0 && select.size() == 1) {
- /**
- * Case 5: No subject, no where (just 1 select)
- */
-// cloudbase.core.client.Scanner sc = scannerForPredicate(start, end, (URI) select.get(0).getKey().getValue());
-// if (sc == null) {
-// this.iter = new EmptyIteration();
-// return;
-// }
-// this.iter = new FilterIterator(this.bindings, sc.iterator(), subject, select);
- throw new UnsupportedOperationException("Query Case not supported");
- } else {
- throw new QueryEvaluationException("Case not supported as of yet");
- }
-
- } catch (Exception e) {
- throw new QueryEvaluationException(e);
- }
- }
-
- protected void setSelectFilter(Var subj, List<Map.Entry<Var, Var>> select) {
- List<String> selectStrs = new ArrayList<String>();
- for (Map.Entry<Var, Var> entry : select) {
- Var key = entry.getKey();
- Var obj = entry.getValue();
- if (key.hasValue()) {
- String pred_s = key.getValue().stringValue();
- selectStrs.add(pred_s);
- job.getConfiguration().set(pred_s, obj.getName());
- }
- }
- job.getConfiguration().setStrings(SELECT_FILTER, selectStrs.toArray(new String[selectStrs.size()]));
- job.getConfiguration().set(SUBJECT_NAME, subj.getName());
- }
-
- protected List<Map.Entry<Var, Var>> retrieveWhereClause() {
- List<Map.Entry<Var, Var>> where = new ArrayList<Map.Entry<Var, Var>>();
- for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) {
- Var pred = entry.getKey();
- Var object = entry.getValue();
- if (pred.hasValue() && object.hasValue()) {
- where.add(entry); //TODO: maybe we should clone this?
- }
- }
- return where;
- }
-
- protected List<Map.Entry<Var, Var>> retrieveSelectClause() {
- List<Map.Entry<Var, Var>> select = new ArrayList<Map.Entry<Var, Var>>();
- for (Map.Entry<Var, Var> entry : lookup.getPredicateObjectPairs()) {
- Var pred = entry.getKey();
- Var object = entry.getValue();
- if (pred.hasValue() && !object.hasValue()) {
- select.add(entry); //TODO: maybe we should clone this?
- }
- }
- return select;
- }
-
- protected void scannerForPredicateObject(ShardSubjectLookup lookup, Long start, Long end, List<Map.Entry<Var, Var>> predObjs) throws IOException, TableNotFoundException {
- start = validateFillStartTime(start, lookup);
- end = validateFillEndTime(end, lookup);
-
- int extra = 0;
-
- if (isTimeRange(lookup, job.getConfiguration())) {
- extra += 1;
- }
-
- Text[] queries = new Text[predObjs.size() + extra];
- for (int i = 0; i < predObjs.size(); i++) {
- Map.Entry<Var, Var> predObj = predObjs.get(i);
- ByteArrayDataOutput output = ByteStreams.newDataOutput();
- writeValue(output, predObj.getKey().getValue());
- output.write(INDEX_DELIM);
- writeValue(output, predObj.getValue().getValue());
- queries[i] = new Text(output.toByteArray());
- }
-
- if (isTimeRange(lookup, job.getConfiguration())) {
- queries[queries.length - 1] = new Text(
- GMDenIntersectingIterator.getRangeTerm(INDEX.toString(),
- getStartTimeRange(lookup, job.getConfiguration())
- , true,
- getEndTimeRange(lookup, job.getConfiguration()),
- true
- )
- );
- }
-
- createBatchScannerInputFormat();
- CloudbaseBatchScannerInputFormat.setIterator(job, 20, GMDenIntersectingIterator.class.getName(), "ii");
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.docFamilyOptionName, DOC.toString());
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.indexFamilyOptionName, INDEX.toString());
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.columnFamiliesOptionName, GMDenIntersectingIterator.encodeColumns(queries));
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ii", GMDenIntersectingIterator.OPTION_MULTI_DOC, "" + true);
-
- Range range = new Range(
- new Key(new Text(generator.generateShardValue(start, null) + "\0")),
- new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD"))
- );
- CloudbaseBatchScannerInputFormat.setRanges(job, Collections.singleton(
- range
- ));
- }
-
- protected void scannerForPredicateObject(ShardSubjectLookup lookup, Long start, Long end, Map.Entry<Var, Var> predObj) throws IOException, TableNotFoundException {
- start = validateFillStartTime(start, lookup);
- end = validateFillEndTime(end, lookup);
-
- /**
- * Need to use GMDen because SortedRange can't serialize non xml characters in range
- * @see https://issues.apache.org/jira/browse/MAPREDUCE-109
- */
- createBatchScannerInputFormat();
- CloudbaseBatchScannerInputFormat.setIterator(job, 20, SortedEncodedRangeIterator.class.getName(), "ri");
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_DOC_COLF, DOC.toString());
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_COLF, INDEX.toString());
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_START_INCLUSIVE, "" + true);
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_END_INCLUSIVE, "" + true);
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_MULTI_DOC, "" + true);
-
- String lower, upper = null;
- if (isTimeRange(lookup, job.getConfiguration())) {
- lower = getStartTimeRange(lookup, job.getConfiguration());
- upper = getEndTimeRange(lookup, job.getConfiguration());
- } else {
-
- ByteArrayDataOutput output = ByteStreams.newDataOutput();
- writeValue(output, predObj.getKey().getValue());
- output.write(INDEX_DELIM);
- writeValue(output, predObj.getValue().getValue());
-
- lower = new String(output.toByteArray());
- upper = lower + "\01";
- }
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_LOWER_BOUND, SortedEncodedRangeIterator.encode(lower));
- CloudbaseBatchScannerInputFormat.setIteratorOption(job, "ri", SortedRangeIterator.OPTION_UPPER_BOUND, SortedEncodedRangeIterator.encode(upper));
-
- //TODO: Do we add a time predicate to this?
-// bs.setScanIterators(19, FilteringIterator.class.getName(), "filteringIterator");
-// bs.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName());
-// bs.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, (end - start) + "");
-// bs.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, end + "");
-
- Range range = new Range(
- new Key(new Text(generator.generateShardValue(start, null) + "\0")),
- new Key(new Text(generator.generateShardValue(end, null) + "\uFFFD"))
- );
- CloudbaseBatchScannerInputFormat.setRanges(job, Collections.singleton(
- range
- ));
-
- }
-
- protected void createBatchScannerInputFormat() {
- job.setInputFormatClass(CloudbaseBatchScannerInputFormat.class);
- CloudbaseBatchScannerInputFormat.setInputInfo(job, userName, pwd.getBytes(), table, CBConstants.NO_AUTHS); //may need to change these auths sometime soon
- CloudbaseBatchScannerInputFormat.setZooKeeperInstance(job, instance, zk);
- job.setMapperClass(KeyValueToMapWrMapper.class);
- job.setCombinerClass(AggregateTriplesBySubjectCombiner.class);
- job.setReducerClass(AggregateTriplesBySubjectReducer.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(MapWritable.class);
- job.setOutputKeyClass(LongWritable.class);
- job.setOutputValueClass(MapWritable.class);
-
- job.getConfiguration().set("io.sort.mb", "256");
- job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
- job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java
deleted file mode 100644
index 84f83c0..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/transform/SparqlCloudbaseIFTransformerConstants.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.transform;
-
-/**
- * Class SparqlCloudbaseIFTransformerConstants
- * Date: Sep 1, 2011
- * Time: 5:01:10 PM
- */
-public class SparqlCloudbaseIFTransformerConstants {
- public static final String PREFIX = "mvm.mmrts.rdf.partition.mr.transform.";
- public static final String SELECT_FILTER = PREFIX + "select";
- public static final String SUBJECT_NAME = PREFIX + "subject";
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java
deleted file mode 100644
index effb9ff..0000000
--- a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatToolTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.compat;
-
-import junit.framework.TestCase;
-
-/**
- * Class ChangeShardDateFormatToolTest
- * Date: Dec 9, 2011
- * Time: 10:39:31 AM
- */
-public class ChangeShardDateFormatToolTest extends TestCase {
-
- public void testShardDelim() throws Exception {
- String dateDelim = "-";
- String shard = "2011-11-01";
- int shardIndex = shard.lastIndexOf(dateDelim);
- if (shardIndex == -1)
- fail();
- String date = shard.substring(0, shardIndex);
- shard = shard.substring(shardIndex + 1, shard.length());
- assertEquals("2011-11", date);
- assertEquals("01", shard);
-
- dateDelim = "_";
- shard = "20111101_33";
- shardIndex = shard.lastIndexOf(dateDelim);
- if (shardIndex == -1)
- fail();
- date = shard.substring(0, shardIndex);
- shard = shard.substring(shardIndex + 1, shard.length());
- assertEquals("20111101", date);
- assertEquals("33", shard);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java
deleted file mode 100644
index c279348..0000000
--- a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToolTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput;
-
-import cloudbase.core.client.Connector;
-import cloudbase.core.client.ZooKeeperInstance;
-import cloudbase.core.data.ColumnUpdate;
-import cloudbase.core.data.Mutation;
-import junit.framework.TestCase;
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.zookeeper.ZooKeeper;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.StatementImpl;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Class RdfFileInputToolTest
- * Date: Aug 8, 2011
- * Time: 3:22:25 PM
- */
-public class RdfFileInputToolTest extends TestCase {
-
- ValueFactory vf = ValueFactoryImpl.getInstance();
-
- /**
- * MRUnit for latest mapreduce (0.21 api)
- * <p/>
- * 1. Test to see if the bytes overwrite will affect
- */
-
- private Mapper<LongWritable, BytesWritable, Text, BytesWritable> mapper = new RdfFileInputToCloudbaseTool.OutSubjStmtMapper();
- private Reducer<Text, BytesWritable, Text, Mutation> reducer = new RdfFileInputToCloudbaseTool.StatementToMutationReducer();
- private MapReduceDriver<LongWritable, BytesWritable, Text, BytesWritable, Text, Mutation> driver;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- driver = new MapReduceDriver(mapper, reducer);
- Configuration conf = new Configuration();
- conf.set(RdfFileInputToCloudbaseTool.CB_TABLE_PROP, "table");
- driver.setConfiguration(conf);
- }
-
- public void testNormalRun() throws Exception {
- StatementImpl stmt1 = new StatementImpl(vf.createURI("urn:namespace#subject"), vf.createURI("urn:namespace#pred"), vf.createLiteral("object"));
- StatementImpl stmt2 = new StatementImpl(vf.createURI("urn:namespace#subject"), vf.createURI("urn:namespace#pred"), vf.createLiteral("obje"));
- StatementImpl stmt3 = new StatementImpl(vf.createURI("urn:namespace#subj2"), vf.createURI("urn:namespace#pred"), vf.createLiteral("ob"));
- List<Pair<Text, Mutation>> pairs = driver.
- withInput(new LongWritable(1), new BytesWritable(RdfIO.writeStatement(stmt1, true))).
- withInput(new LongWritable(1), new BytesWritable(RdfIO.writeStatement(stmt2, true))).
- withInput(new LongWritable(1), new BytesWritable(RdfIO.writeStatement(stmt3, true))).
- run();
-
- assertEquals(4, pairs.size());
-
- ColumnUpdate update = pairs.get(0).getSecond().getUpdates().get(0);
- assertEquals("event", new String(update.getColumnFamily()));
- assertEquals("\07urn:namespace#subj2\0\07urn:namespace#pred\0\u0009ob", new String(update.getColumnQualifier()));
- }
-
- public static void main(String[] args) {
- try {
- Connector connector = new ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", "password".getBytes());
- Collection<Text> splits = connector.tableOperations().getSplits("partitionRdf", Integer.MAX_VALUE);
- System.out.println(splits.size());
- System.out.println(splits);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java b/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java
deleted file mode 100644
index bd63f6f..0000000
--- a/partition/mr.partition.rdf/src/test/java/mvm/mmrts/rdf/partition/mr/fileinput/bulk/EmbedKeyRangePartitionerTest.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput.bulk;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.io.Text;
-
-/**
- * Class EmbedKeyRangePartitionerTest
- * Date: Sep 13, 2011
- * Time: 1:58:28 PM
- */
-public class EmbedKeyRangePartitionerTest extends TestCase {
-
- public void testRetrieveEmbedKey() throws Exception {
- assertEquals(new Text("hello"), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("hello\1there")));
- assertEquals(new Text("h"), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("h\1there")));
- assertEquals(new Text(""), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("\1there")));
- assertEquals(new Text("hello there"), EmbedKeyRangePartitioner.retrieveEmbedKey(new Text("hello there")));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/partition.rdf/pom.xml
----------------------------------------------------------------------
diff --git a/partition/partition.rdf/pom.xml b/partition/partition.rdf/pom.xml
deleted file mode 100644
index 2701d64..0000000
--- a/partition/partition.rdf/pom.xml
+++ /dev/null
@@ -1,281 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>mvm.rya</groupId>
- <artifactId>parent</artifactId>
- <version>3.0.0.alpha1</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>mvm.mmrts.rdf</groupId>
- <artifactId>partition.rdf</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- <name>${project.groupId}.${project.artifactId}</name>
-
- <dependencies>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-runtime</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryresultio-sparqlxml</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-rio-rdfxml</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>sitestore.common</groupId>
- <artifactId>common-query</artifactId>
- </dependency>
- <dependency>
- <groupId>mvm.rya</groupId>
- <artifactId>cloudbase.utils</artifactId>
- </dependency>
-
- <!-- Cloudbase deps -->
- <dependency>
- <groupId>cloudbase</groupId>
- <artifactId>cloudbase-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>zookeeper</artifactId>
- </dependency>
-
- <!-- Test -->
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- Deps that are transitive but listed anyway
-
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-model</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-query</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryalgebra-model</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryparser-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryparser-serql</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryparser-sparql</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryparser-serql</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryresultio-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryresultio-binary</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryresultio-sparqljson</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryresultio-text</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-repository-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-repository-manager</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-repository-event</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-repository-sail</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-sail-memory</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-sail-inferencer</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-queryalgebra-evaluation</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-repository-http</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-http-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-repository-contextaware</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-repository-dataset</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-http-protocol</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-rio-ntriples</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-rio-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-rio-n3</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-rio-trix</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-rio-turtle</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-rio-trig</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-sail-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-sail-nativerdf</artifactId>
- </dependency>
- <dependency>
- <groupId>org.openrdf.sesame</groupId>
- <artifactId>sesame-sail-rdbms</artifactId>
- </dependency>
- <dependency>
- <groupId>info.aduna.commons</groupId>
- <artifactId>aduna-commons-collections</artifactId>
- </dependency>
- <dependency>
- <groupId>info.aduna.commons</groupId>
- <artifactId>aduna-commons-iteration</artifactId>
- </dependency>
- <dependency>
- <groupId>info.aduna.commons</groupId>
- <artifactId>aduna-commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>info.aduna.commons</groupId>
- <artifactId>aduna-commons-lang</artifactId>
- </dependency>
- <dependency>
- <groupId>info.aduna.commons</groupId>
- <artifactId>aduna-commons-i18n</artifactId>
- </dependency>
- <dependency>
- <groupId>info.aduna.commons</groupId>
- <artifactId>aduna-commons-concurrent</artifactId>
- </dependency>
- <dependency>
- <groupId>info.aduna.commons</groupId>
- <artifactId>aduna-commons-xml</artifactId>
- </dependency>
- <dependency>
- <groupId>info.aduna.commons</groupId>
- <artifactId>aduna-commons-text</artifactId>
- </dependency>
- <dependency>
- <groupId>info.aduna.commons</groupId>
- <artifactId>aduna-commons-net</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-dbcp</groupId>
- <artifactId>commons-dbcp</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency> -->
-
- </dependencies>
- <repositories>
- <repository>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- <id>aduna-opensource.releases</id>
- <name>Aduna Open Source - Maven releases</name>
- <url>http://repo.aduna-software.org/maven2/releases</url>
- </repository>
- </repositories>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>**/*IntegrationTest.java
- </exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java
----------------------------------------------------------------------
diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java
deleted file mode 100644
index 0c723a1..0000000
--- a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/InvalidValueTypeMarkerRuntimeException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package mvm.mmrts.rdf.partition;
-
-/**
- * Class InvalidValueTypeMarkerRuntimeException
- * Date: Jan 7, 2011
- * Time: 12:58:27 PM
- */
-public class InvalidValueTypeMarkerRuntimeException extends RuntimeException {
- private int valueTypeMarker = -1;
-
- public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker) {
- super();
- this.valueTypeMarker = valueTypeMarker;
- }
-
- public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, String s) {
- super(s);
- this.valueTypeMarker = valueTypeMarker;
- }
-
- public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, String s, Throwable throwable) {
- super(s, throwable);
- this.valueTypeMarker = valueTypeMarker;
- }
-
- public InvalidValueTypeMarkerRuntimeException(int valueTypeMarker, Throwable throwable) {
- super(throwable);
- this.valueTypeMarker = valueTypeMarker;
- }
-
- public int getValueTypeMarker() {
- return valueTypeMarker;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java
----------------------------------------------------------------------
diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java
deleted file mode 100644
index 83e0675..0000000
--- a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConnection.java
+++ /dev/null
@@ -1,306 +0,0 @@
-package mvm.mmrts.rdf.partition;
-
-import cloudbase.core.client.BatchWriter;
-import cloudbase.core.client.Connector;
-import cloudbase.core.client.Scanner;
-import cloudbase.core.client.admin.TableOperations;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.data.Range;
-import cloudbase.core.security.ColumnVisibility;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import info.aduna.iteration.CloseableIteration;
-import mvm.mmrts.rdf.partition.converter.ContextColVisConverter;
-import mvm.mmrts.rdf.partition.iterators.NamespaceIterator;
-import mvm.mmrts.rdf.partition.query.evaluation.FilterTimeIndexVisitor;
-import mvm.mmrts.rdf.partition.query.evaluation.PartitionEvaluationStrategy;
-import mvm.mmrts.rdf.partition.query.evaluation.SubjectGroupingOptimizer;
-import mvm.mmrts.rdf.partition.shard.ShardValueGenerator;
-import mvm.mmrts.rdf.partition.utils.ContextsStatementImpl;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.openrdf.model.*;
-import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.Dataset;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.QueryRoot;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.query.impl.EmptyBindingSet;
-import org.openrdf.sail.SailException;
-import org.openrdf.sail.helpers.SailConnectionBase;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-import static mvm.mmrts.rdf.partition.PartitionConstants.*;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
-
-/**
- * Class PartitionConnection
- * Date: Jul 6, 2011
- * Time: 4:40:49 PM
- * <p/>
- * Ingest:
- * Triple ->
- * - <subject> <shard>:
- * - <shard> event:<subject>\0<predicate>\0<object>
- * - <shard> index:<predicate>\1<object>\0
- * <p/>
- * Namespace ->
- * - <prefix> ns:<namespace>
- */
-public class PartitionConnection extends SailConnectionBase {
-
- private PartitionSail sail;
- private BatchWriter writer;
- private BatchWriter shardTableWriter; //MMRTS-148
-
- private Multimap<Resource, ContextsStatementImpl> statements = HashMultimap.create(10000, 10);
-
-
- public PartitionConnection(PartitionSail sailBase) throws SailException {
- super(sailBase);
- this.sail = sailBase;
- this.initialize();
- }
-
- protected void initialize() throws SailException {
- try {
- Connector connector = sail.connector;
- String table = sail.table;
- String shardTable = sail.shardTable;
-
- //create these tables if they do not exist
- TableOperations tableOperations = connector.tableOperations();
- boolean tableExists = tableOperations.exists(table);
- if (!tableExists)
- tableOperations.create(table);
-
- tableExists = tableOperations.exists(shardTable);
- if(!tableExists)
- tableOperations.create(shardTable);
-
- writer = connector.createBatchWriter(table, 1000000l, 60000l, 10);
- shardTableWriter = connector.createBatchWriter(shardTable, 1000000l, 60000l, 10);
- } catch (Exception e) {
- throw new SailException(e);
- }
- }
-
- @Override
- protected void closeInternal() throws SailException {
- try {
- writer.close();
- shardTableWriter.close();
- } catch (Exception e) {
- throw new SailException(e);
- }
- }
-
- @Override
- protected CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluateInternal(TupleExpr tupleExpr, Dataset dataset, BindingSet bindingSet, boolean b) throws SailException {
-// throw new UnsupportedOperationException("Query not supported");
-
- if (!(tupleExpr instanceof QueryRoot))
- tupleExpr = new QueryRoot(tupleExpr);
-
- try {
- Configuration queryConf = populateConf(bindingSet);
- //timeRange filter check
- tupleExpr.visit(new FilterTimeIndexVisitor(queryConf));
-
- (new SubjectGroupingOptimizer(queryConf)).optimize(tupleExpr, dataset, bindingSet);
- PartitionTripleSource source = new PartitionTripleSource(this.sail, queryConf);
-
- PartitionEvaluationStrategy strategy = new PartitionEvaluationStrategy(
- source, dataset);
-
- return strategy.evaluate(tupleExpr, EmptyBindingSet.getInstance());
- } catch (Exception e) {
- throw new SailException(e);
- }
-
- }
-
- protected Configuration populateConf(BindingSet bs) {
- Configuration conf = new Configuration(this.sail.conf);
-
- for (String bname : bs.getBindingNames()) {
- conf.set(bname, bs.getValue(bname).stringValue());
- }
- Binding start = bs.getBinding(START_BINDING);
- if (start != null)
- conf.setLong(START_BINDING, Long.parseLong(start.getValue().stringValue()));
-
- Binding end = bs.getBinding(END_BINDING);
- if (end != null)
- conf.setLong(END_BINDING, Long.parseLong(end.getValue().stringValue()));
-
- Binding timePredicate = bs.getBinding(TIME_PREDICATE);
- if (timePredicate != null)
- conf.set(TIME_PREDICATE, timePredicate.getValue().stringValue());
-
- Binding timeType = bs.getBinding(TIME_TYPE_PROP);
- if (timeType != null)
- conf.set(TIME_TYPE_PROP, timeType.getValue().stringValue());
- else if (timePredicate != null)
- conf.set(TIME_TYPE_PROP, TimeType.XMLDATETIME.name()); //default to xml datetime
-
- return conf;
- }
-
- @Override
- protected CloseableIteration<? extends Resource, SailException> getContextIDsInternal() throws SailException {
- throw new UnsupportedOperationException("Contexts not supported");
- }
-
- @Override
- protected CloseableIteration<? extends Statement, SailException> getStatementsInternal(Resource resource, URI uri, Value value, boolean b, Resource... resources) throws SailException {
- throw new UnsupportedOperationException("Query not supported");
- }
-
- @Override
- protected long sizeInternal(Resource... resources) throws SailException {
- throw new UnsupportedOperationException("Size operation not supported");
- }
-
- @Override
- protected void startTransactionInternal() throws SailException {
- // no transaction support as of yet
- }
-
- @Override
- protected void commitInternal() throws SailException {
- try {
- ShardValueGenerator gen = sail.generator;
- ContextColVisConverter contextColVisConverter = sail.contextColVisConverter;
- Map<Resource, Collection<ContextsStatementImpl>> map = statements.asMap();
- for (Map.Entry<Resource, Collection<ContextsStatementImpl>> entry : map.entrySet()) {
- Resource subject = entry.getKey();
- byte[] subj_bytes = writeValue(subject);
- String shard = gen.generateShardValue(subject);
- Text shard_txt = new Text(shard);
- Collection<ContextsStatementImpl> stmts = entry.getValue();
-
- /**
- * Triple - >
- *- < subject ><shard >:
- *- < shard > event:<subject >\0 < predicate >\0 < object >
- *- < shard > index:<predicate >\1 < object >\0
- */
- Mutation m_subj = new Mutation(shard_txt);
- for (ContextsStatementImpl stmt : stmts) {
- Resource[] contexts = stmt.getContexts();
- ColumnVisibility vis = null;
- if (contexts != null && contexts.length > 0 && contextColVisConverter != null) {
- vis = contextColVisConverter.convertContexts(contexts);
- }
-
- if (vis != null) {
- m_subj.put(DOC, new Text(writeStatement(stmt, true)), vis, EMPTY_VALUE);
- m_subj.put(INDEX, new Text(writeStatement(stmt, false)), vis, EMPTY_VALUE);
- } else {
- m_subj.put(DOC, new Text(writeStatement(stmt, true)), EMPTY_VALUE);
- m_subj.put(INDEX, new Text(writeStatement(stmt, false)), EMPTY_VALUE);
- }
- }
-
- /**
- * TODO: Is this right?
- * If the subject does not have any authorizations specified, then anyone can access it.
- * But the true authorization check will happen at the predicate/object level, which means that
- * the set returned will only be what the person is authorized to see. The shard lookup table has to
- * have the lowest level authorization all the predicate/object authorizations; otherwise,
- * a user may not be able to see the correct document.
- */
- Mutation m_shard = new Mutation(new Text(subj_bytes));
- m_shard.put(shard_txt, EMPTY_TXT, EMPTY_VALUE);
- shardTableWriter.addMutation(m_shard);
-
- writer.addMutation(m_subj);
- }
-
- writer.flush();
- shardTableWriter.flush();
- statements.clear();
- } catch (Exception e) {
- throw new SailException(e);
- }
- finally {
- }
- }
-
- @Override
- protected void rollbackInternal() throws SailException {
- statements.clear();
- }
-
- @Override
- protected void addStatementInternal(Resource subject, URI predicate, Value object, Resource... contexts) throws SailException {
- statements.put(subject, new ContextsStatementImpl(subject, predicate, object, contexts));
- }
-
- @Override
- protected void removeStatementsInternal(Resource resource, URI uri, Value value, Resource... contexts) throws SailException {
- throw new UnsupportedOperationException("Remove not supported as of yet");
- }
-
- @Override
- protected void clearInternal(Resource... resources) throws SailException {
- throw new UnsupportedOperationException("Clear with context not supported as of yet");
- }
-
- @Override
- protected CloseableIteration<? extends Namespace, SailException> getNamespacesInternal() throws SailException {
- return new NamespaceIterator(sail.connector, sail.table);
- }
-
- @Override
- protected String getNamespaceInternal(String prefix) throws SailException {
- try {
- Scanner scanner = sail.connector.createScanner(sail.table, ALL_AUTHORIZATIONS);
- scanner.setRange(new Range(new Text(prefix)));
- scanner.fetchColumnFamily(NAMESPACE);
- Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iter = scanner.iterator();
- if (iter != null && iter.hasNext())
- return iter.next().getKey().getColumnQualifier().toString();
- } catch (Exception e) {
- throw new SailException(e);
- }
- return null;
- }
-
- @Override
- protected void setNamespaceInternal(String prefix, String namespace) throws SailException {
- /**
- * Namespace ->
- * - <prefix> <namespace>:
- */
-
- try {
- Mutation m = new Mutation(new Text(prefix));
- m.put(NAMESPACE, new Text(namespace), EMPTY_VALUE);
- writer.addMutation(m);
- } catch (Exception e) {
- throw new SailException(e);
- }
- }
-
- @Override
- protected void removeNamespaceInternal
- (String
- s) throws SailException {
- throw new UnsupportedOperationException("Namespace remove not supported");
- }
-
- @Override
- protected void clearNamespacesInternal
- () throws SailException {
- throw new UnsupportedOperationException("Namespace Clear not supported");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java
----------------------------------------------------------------------
diff --git a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java b/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java
deleted file mode 100644
index cb69596..0000000
--- a/partition/partition.rdf/src/main/java/mvm/mmrts/rdf/partition/PartitionConstants.java
+++ /dev/null
@@ -1,141 +0,0 @@
-package mvm.mmrts.rdf.partition;
-
-import cloudbase.core.CBConstants;
-import cloudbase.core.data.Value;
-import cloudbase.core.security.Authorizations;
-import mvm.mmrts.rdf.partition.query.operators.ShardSubjectLookup;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.openrdf.model.Literal;
-import org.openrdf.model.URI;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-/**
- * Class PartitionConstants
- * Date: Jul 6, 2011
- * Time: 12:22:55 PM
- */
-public class PartitionConstants {
-
- public static final String PARTITION_NS = "urn:mvm.mmrts.partition.rdf/08/2011#";
- public static ValueFactory VALUE_FACTORY = ValueFactoryImpl.getInstance();
- public static URI TIMERANGE = VALUE_FACTORY.createURI(PARTITION_NS, "timeRange");
- public static URI SHARDRANGE = VALUE_FACTORY.createURI(PARTITION_NS, "shardRange"); //shardRange(subject, start, stop) in ms
- public static Literal EMPTY_LITERAL = VALUE_FACTORY.createLiteral(0);
-
- public static final byte FAMILY_DELIM = 0;
- public static final String FAMILY_DELIM_STR = "\0";
- public static final byte INDEX_DELIM = 1;
- public static final String INDEX_DELIM_STR = "\1";
-
- /* RECORD TYPES */
-// public static final int NAMESPACE_MARKER = 2;
-//
-// public static final int EXPL_TRIPLE_MARKER = 3;
-//
-// public static final int EXPL_QUAD_MARKER = 4;
-//
-// public static final int INF_TRIPLE_MARKER = 5;
-//
-// public static final int INF_QUAD_MARKER = 6;
-
- public static final int URI_MARKER = 7;
-
- public static final String URI_MARKER_STR = "\07";
-
- public static final int BNODE_MARKER = 8;
-
- public static final int PLAIN_LITERAL_MARKER = 9;
-
- public static final String PLAIN_LITERAL_MARKER_STR = "\u0009";
-
- public static final int LANG_LITERAL_MARKER = 10;
-
- public static final int DATATYPE_LITERAL_MARKER = 11;
-
- public static final String DATATYPE_LITERAL_MARKER_STR = "\u000B";
-
- public static final int EOF_MARKER = 127;
-
- // public static final Authorizations ALL_AUTHORIZATIONS = new Authorizations(
- // "_");
- public static final Authorizations ALL_AUTHORIZATIONS = CBConstants.NO_AUTHS;
-
- public static final Value EMPTY_VALUE = new Value(new byte[0]);
- public static final Text EMPTY_TXT = new Text("");
-
- /* Column Families and Qualifiers */
- public static final Text INDEX = new Text("index");
- public static final Text DOC = new Text("event");
- public static final Text NAMESPACE = new Text("ns");
-
- /* Time constants */
- public static final String START_BINDING = "binding.start";
- public static final String END_BINDING = "binding.end";
- public static final String TIME_PREDICATE = "binding.timePredicate";
- public static final String SHARDRANGE_BINDING = "binding.shardRange";
- public static final String SHARDRANGE_START = "binding.shardRange.start";
- public static final String SHARDRANGE_END = "binding.shardRange.end";
- public static final String TIME_TYPE_PROP = "binding.timeProp";
- public static final String AUTHORIZATION_PROP = "binding.authorization";
- public static final String NUMTHREADS_PROP = "binding.numthreads";
- public static final String ALLSHARDS_PROP = "binding.allshards";
-
- public static final String VALUE_DELIMITER = "\03";
-
- public static final SimpleDateFormat XMLDATE = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
-
- public enum TimeType {
- TIMESTAMP, XMLDATETIME
- }
-
- public static boolean isTimeRange(ShardSubjectLookup lookup, Configuration configuration) {
- return (configuration.get(TIME_PREDICATE) != null) || (lookup.getTimePredicate() != null);
- }
-
- public static Long validateFillStartTime(Long start, ShardSubjectLookup lookup) {
- if (lookup.getShardStartTimeRange() != null)
- return Long.parseLong(lookup.getShardEndTimeRange());
- return (start == null) ? 0l : start;
- }
-
- public static Long validateFillEndTime(Long end, ShardSubjectLookup lookup) {
- if (lookup.getShardEndTimeRange() != null)
- return Long.parseLong(lookup.getShardEndTimeRange());
- return (end == null) ? System.currentTimeMillis() : end;
- }
-
- public static String getStartTimeRange(ShardSubjectLookup lookup, Configuration configuration) {
- String tp = configProperty(configuration, TIME_PREDICATE, lookup.getTimePredicate());
- String st = configProperty(configuration, START_BINDING, lookup.getStartTimeRange());
- TimeType tt = lookup.getTimeType();
- if (tt == null)
- tt = TimeType.valueOf(configuration.get(TIME_TYPE_PROP));
- return URI_MARKER_STR + tp + INDEX_DELIM_STR + convertTime(Long.parseLong(st), tt);
- }
-
- public static String getEndTimeRange(ShardSubjectLookup lookup, Configuration configuration) {
- String tp = configProperty(configuration, TIME_PREDICATE, lookup.getTimePredicate());
- String et = configProperty(configuration, END_BINDING, lookup.getEndTimeRange());
- TimeType tt = lookup.getTimeType();
- if (tt == null)
- tt = TimeType.valueOf(configuration.get(TIME_TYPE_PROP));
- return URI_MARKER_STR + tp + INDEX_DELIM_STR + convertTime(Long.parseLong(et), tt);
- }
-
- public static String convertTime(Long timestamp, TimeType timeType) {
- return (TimeType.XMLDATETIME.equals(timeType))
- ? (DATATYPE_LITERAL_MARKER_STR + XMLDATE.format(new Date(timestamp)))
- : PLAIN_LITERAL_MARKER_STR + timestamp;
- }
-
- public static String configProperty(Configuration configuration, String property, String checkValue) {
- if (checkValue == null)
- return configuration.get(property);
- return checkValue;
- }
-}