You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by pu...@apache.org on 2015/12/04 17:46:56 UTC
[44/49] incubator-rya git commit: RYA-7 POM and License Clean-up for
Apache Move
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
deleted file mode 100644
index c03b124..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
+++ /dev/null
@@ -1,318 +0,0 @@
-package mvm.rya.cloudbase.mr.fileinput;
-
-import cloudbase.core.client.Connector;
-import cloudbase.core.client.ZooKeeperInstance;
-import cloudbase.core.client.admin.TableOperations;
-import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat;
-import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Value;
-import cloudbase.core.util.TextUtil;
-import com.google.common.base.Preconditions;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolver;
-import mvm.rya.cloudbase.CloudbaseRdfConstants;
-import mvm.rya.cloudbase.mr.utils.MRUtils;
-import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.*;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static mvm.rya.cloudbase.CloudbaseRdfUtils.extractValue;
-import static mvm.rya.cloudbase.CloudbaseRdfUtils.from;
-
-/**
- * Take large ntrips files and use MapReduce and Cloudbase
- * Bulk ingest techniques to load into the table in our partition format.
- * <p/>
- * Input: NTrips file
- * Map:
- * - key : shard row - Text
- * - value : stmt in doc triple format - Text
- * Partitioner: RangePartitioner
- * Reduce:
- * - key : all the entries for each triple - Cloudbase Key
- * Class BulkNtripsInputTool
- * Date: Sep 13, 2011
- * Time: 10:00:17 AM
- */
-public class BulkNtripsInputTool extends Configured implements Tool {
-
- public static final String WORKDIR_PROP = "bulk.n3.workdir";
-
- private String userName = "root";
- private String pwd = "password";
- private String instance = "stratus";
- private String zk = "10.40.190.129:2181";
- private String ttl = null;
- private String workDirBase = "/temp/bulkcb/work";
- private String format = RDFFormat.NTRIPLES.getName();
-
- @Override
- public int run(final String[] args) throws Exception {
- final Configuration conf = getConf();
- try {
- //conf
- zk = conf.get(MRUtils.CB_ZK_PROP, zk);
- ttl = conf.get(MRUtils.CB_TTL_PROP, ttl);
- instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
- userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
- pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
- workDirBase = conf.get(WORKDIR_PROP, workDirBase);
- format = conf.get(MRUtils.FORMAT_PROP, format);
- conf.set(MRUtils.FORMAT_PROP, format);
- final String inputDir = args[0];
-
- ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk);
- Connector connector = zooKeeperInstance.getConnector(userName, pwd);
- TableOperations tableOperations = connector.tableOperations();
-
- String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
- if (tablePrefix != null)
- RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
- String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
- tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
- tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX};
- Collection<Job> jobs = new ArrayList<Job>();
- for (final String tableName : tables) {
- PrintStream out = null;
- try {
- String workDir = workDirBase + "/" + tableName;
- System.out.println("Loading data into table[" + tableName + "]");
-
- Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]");
- job.setJarByClass(this.getClass());
- //setting long job
- Configuration jobConf = job.getConfiguration();
- jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
- jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
- jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", "256"));
- jobConf.setBoolean("mapred.compress.map.output", true);
-// jobConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression
-
- job.setInputFormatClass(TextInputFormat.class);
-
- job.setMapperClass(ParseNtripsMapper.class);
- job.setMapOutputKeyClass(Key.class);
- job.setMapOutputValueClass(Value.class);
-
- job.setCombinerClass(OutStmtMutationsReducer.class);
- job.setReducerClass(OutStmtMutationsReducer.class);
- job.setOutputFormatClass(CloudbaseFileOutputFormat.class);
- CloudbaseFileOutputFormat.setZooKeeperInstance(job, instance, zk);
-
- jobConf.set(ParseNtripsMapper.TABLE_PROPERTY, tableName);
-
- TextInputFormat.setInputPaths(job, new Path(inputDir));
-
- FileSystem fs = FileSystem.get(conf);
- Path workPath = new Path(workDir);
- if (fs.exists(workPath))
- fs.delete(workPath, true);
-
- CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files"));
-
- out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
-
- if (!tableOperations.exists(tableName))
- tableOperations.create(tableName);
- Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE);
- for (Text split : splits)
- out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split))));
-
- job.setNumReduceTasks(splits.size() + 1);
- out.close();
-
- job.setPartitionerClass(KeyRangePartitioner.class);
- RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
-
- jobConf.set(WORKDIR_PROP, workDir);
-
- job.submit();
- jobs.add(job);
-
- } catch (Exception re) {
- throw new RuntimeException(re);
- } finally {
- if (out != null)
- out.close();
- }
- }
-
- for (Job job : jobs) {
- while (!job.isComplete()) {
- Thread.sleep(1000);
- }
- }
-
- for (String tableName : tables) {
- String workDir = workDirBase + "/" + tableName;
- tableOperations.importDirectory(
- tableName,
- workDir + "/files",
- workDir + "/failures",
- 20,
- 4,
- false);
- }
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return 0;
- }
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * input: ntrips format triple
- * <p/>
- * output: key: shard row from generator
- * value: stmt in serialized format (document format)
- */
- public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> {
- public static final String TABLE_PROPERTY = "parsentripsmapper.table";
-
- private RDFParser parser;
- private String rdfFormat;
- private String namedGraph;
- private RyaContext ryaContext = RyaContext.getInstance();
- private TripleRowResolver rowResolver = ryaContext.getTripleResolver();
-
- @Override
- protected void setup(final Context context) throws IOException, InterruptedException {
- super.setup(context);
- Configuration conf = context.getConfiguration();
- final String table = conf.get(TABLE_PROPERTY);
- Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job");
-
- final String cv_s = conf.get(MRUtils.CB_CV_PROP);
- final byte[] cv = cv_s == null ? null : cv_s.getBytes();
- rdfFormat = conf.get(MRUtils.FORMAT_PROP);
- checkNotNull(rdfFormat, "Rdf format cannot be null");
-
- namedGraph = conf.get(MRUtils.NAMED_GRAPH_PROP);
-
- parser = Rio.createParser(RDFFormat.valueOf(rdfFormat));
- parser.setParserConfig(new ParserConfig(true, true, true, RDFParser.DatatypeHandling.VERIFY));
- parser.setRDFHandler(new RDFHandler() {
-
- @Override
- public void startRDF() throws RDFHandlerException {
-
- }
-
- @Override
- public void endRDF() throws RDFHandlerException {
-
- }
-
- @Override
- public void handleNamespace(String s, String s1) throws RDFHandlerException {
-
- }
-
- @Override
- public void handleStatement(Statement statement) throws RDFHandlerException {
- try {
- RyaStatement rs = RdfToRyaConversions.convertStatement(statement);
- if(rs.getColumnVisibility() == null) {
- rs.setColumnVisibility(cv);
- }
-
- // Inject the specified context into the statement.
- if(namedGraph != null){
- rs.setContext(new RyaURI(namedGraph));
- }
-
- Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT,TripleRow> serialize = rowResolver.serialize(rs);
-
- if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) {
- TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
- context.write(
- from(tripleRow),
- extractValue(tripleRow)
- );
- } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) {
- TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
- context.write(
- from(tripleRow),
- extractValue(tripleRow)
- );
- } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) {
- TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
- context.write(
- from(tripleRow),
- extractValue(tripleRow)
- );
- } else
- throw new IllegalArgumentException("Unrecognized table[" + table + "]");
-
- } catch (Exception e) {
- throw new RDFHandlerException(e);
- }
- }
-
- @Override
- public void handleComment(String s) throws RDFHandlerException {
-
- }
- });
- }
-
- @Override
- public void map(LongWritable key, Text value, Context output)
- throws IOException, InterruptedException {
- String rdf = value.toString();
- try {
- parser.parse(new StringReader(rdf), "");
- } catch (RDFParseException e) {
- System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]");
- } catch (Exception e) {
- e.printStackTrace();
- throw new IOException("Exception occurred parsing triple[" + rdf + "]");
- }
- }
- }
-
- public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> {
-
- public void reduce(Key key, Iterable<Value> values, Context output)
- throws IOException, InterruptedException {
- output.write(key, CloudbaseRdfConstants.EMPTY_VALUE);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
deleted file mode 100644
index 5aed4a2..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
+++ /dev/null
@@ -1,230 +0,0 @@
-package mvm.rya.cloudbase.mr.fileinput;
-
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.security.ColumnVisibility;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.cloudbase.CloudbaseRdfConstants;
-import mvm.rya.cloudbase.RyaTableMutationsFactory;
-import mvm.rya.cloudbase.mr.utils.MRUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.*;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputTool2
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputByLineTool implements Tool {
-
- private Configuration conf = new Configuration();
-
- private String userName = "root";
- private String pwd = "password";
- private String instance = "stratus";
- private String zk = "10.40.190.113:2181";
- private String tablePrefix = null;
- private RDFFormat format = RDFFormat.NTRIPLES;
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new RdfFileInputByLineTool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
- conf.set("io.sort.mb", "256");
- conf.setLong("mapred.task.timeout", 600000000);
-
- zk = conf.get(MRUtils.CB_ZK_PROP, zk);
- instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
- userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
- pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
- format = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.getName()));
-
- String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
-
- Job job = new Job(conf);
- job.setJarByClass(RdfFileInputByLineTool.class);
-
- // set up cloudbase input
- job.setInputFormatClass(TextInputFormat.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
-
- // set input output of the particular job
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Mutation.class);
-// job.setOutputKeyClass(LongWritable.class);
-// job.setOutputValueClass(StatementWritable.class);
-
- job.setOutputFormatClass(CloudbaseOutputFormat.class);
- CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-
- // set mapper and reducer classes
- job.setMapperClass(TextToMutationMapper.class);
- job.setNumReduceTasks(0);
-// job.setReducerClass(Reducer.class);
-
- // set output
-// Path outputDir = new Path("/temp/sparql-out/testout");
-// FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
-// if (dfs.exists(outputDir))
-// dfs.deleteMutation(outputDir, true);
-//
-// FileOutputFormat.setOutputPath(job, outputDir);
-
- // Submit the job
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
- if (exitCode == 0) {
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
- return job
- .getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS").getValue();
- } else {
- System.out.println("Job Failed!!!");
- }
-
- return -1;
- }
-
- @Override
- public int run(String[] args) throws Exception {
- return (int) runJob(args);
- }
-
- public static class TextToMutationMapper extends Mapper<LongWritable, Text, Text, Mutation> {
- protected RDFParser parser;
- private String prefix;
- private RDFFormat rdfFormat;
- protected Text spo_table;
- private Text po_table;
- private Text osp_table;
- private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression();
-
- public TextToMutationMapper() {
- }
-
- @Override
- protected void setup(final Context context) throws IOException, InterruptedException {
- super.setup(context);
- Configuration conf = context.getConfiguration();
- prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
- if (prefix != null) {
- RdfCloudTripleStoreConstants.prefixTables(prefix);
- }
-
- spo_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- po_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
- osp_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
-
- final String cv_s = conf.get(MRUtils.CB_CV_PROP);
- if (cv_s != null)
- cv = cv_s.getBytes();
-
- rdfFormat = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.toString()));
- parser = Rio.createParser(rdfFormat);
- final RyaTableMutationsFactory mut = new RyaTableMutationsFactory();
-
- parser.setRDFHandler(new RDFHandler() {
-
- @Override
- public void startRDF() throws RDFHandlerException {
-
- }
-
- @Override
- public void endRDF() throws RDFHandlerException {
-
- }
-
- @Override
- public void handleNamespace(String s, String s1) throws RDFHandlerException {
-
- }
-
- @Override
- public void handleStatement(Statement statement) throws RDFHandlerException {
- try {
- RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement);
- if(ryaStatement.getColumnVisibility() == null) {
- ryaStatement.setColumnVisibility(cv);
- }
- Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap =
- mut.serialize(ryaStatement);
- Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
- Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
- Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-
- for (Mutation m : spo) {
- context.write(spo_table, m);
- }
- for (Mutation m : po) {
- context.write(po_table, m);
- }
- for (Mutation m : osp) {
- context.write(osp_table, m);
- }
- } catch (Exception e) {
- throw new RDFHandlerException(e);
- }
- }
-
- @Override
- public void handleComment(String s) throws RDFHandlerException {
-
- }
- });
- }
-
- @Override
- protected void map(LongWritable key, Text value, final Context context) throws IOException, InterruptedException {
- try {
- parser.parse(new StringReader(value.toString()), "");
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
deleted file mode 100644
index 54f9a13..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package mvm.rya.cloudbase.mr.fileinput;
-
-import mvm.rya.api.domain.utils.RyaStatementWritable;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.cloudbase.mr.utils.MRUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.*;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Be able to input multiple rdf formatted files. Convert from rdf format to statements.
- * Class RdfFileInputFormat
- * Date: May 16, 2011
- * Time: 2:11:24 PM
- */
-public class RdfFileInputFormat extends FileInputFormat<LongWritable, RyaStatementWritable> {
-
- @Override
- public RecordReader<LongWritable, RyaStatementWritable> createRecordReader(InputSplit inputSplit,
- TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
- return new RdfFileRecordReader();
- }
-
- private class RdfFileRecordReader extends RecordReader<LongWritable, RyaStatementWritable> implements RDFHandler {
-
- boolean closed = false;
- long count = 0;
- BlockingQueue<RyaStatementWritable> queue = new LinkedBlockingQueue<RyaStatementWritable>();
- int total = 0;
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- FileSplit fileSplit = (FileSplit) inputSplit;
- Configuration conf = taskAttemptContext.getConfiguration();
- String rdfForm_s = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName()); //default to RDF/XML
- RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s);
-
- Path file = fileSplit.getPath();
- FileSystem fs = file.getFileSystem(conf);
- FSDataInputStream fileIn = fs.open(fileSplit.getPath());
-
- RDFParser rdfParser = Rio.createParser(rdfFormat);
- rdfParser.setRDFHandler(this);
- try {
- rdfParser.parse(fileIn, "");
- } catch (Exception e) {
- throw new IOException(e);
- }
- fileIn.close();
- total = queue.size();
- //TODO: Make this threaded so that you don't hold too many statements before sending them
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return queue.size() > 0;
- }
-
- @Override
- public LongWritable getCurrentKey() throws IOException, InterruptedException {
- return new LongWritable(count++);
- }
-
- @Override
- public RyaStatementWritable getCurrentValue() throws IOException, InterruptedException {
- return queue.poll();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return ((float) (total - queue.size())) / ((float) total);
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- }
-
- @Override
- public void startRDF() throws RDFHandlerException {
- }
-
- @Override
- public void endRDF() throws RDFHandlerException {
- }
-
- @Override
- public void handleNamespace(String s, String s1) throws RDFHandlerException {
- }
-
- @Override
- public void handleStatement(Statement statement) throws RDFHandlerException {
- queue.add(new RyaStatementWritable(RdfToRyaConversions.convertStatement(statement)));
- }
-
- @Override
- public void handleComment(String s) throws RDFHandlerException {
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
deleted file mode 100644
index f48cbae..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package mvm.rya.cloudbase.mr.fileinput;
-
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.security.ColumnVisibility;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.utils.RyaStatementWritable;
-import mvm.rya.cloudbase.CloudbaseRdfConstants;
-import mvm.rya.cloudbase.RyaTableMutationsFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.rio.RDFFormat;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-import static mvm.rya.cloudbase.mr.utils.MRUtils.*;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputTool
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputTool implements Tool {
-
- private Configuration conf;
-
- private String userName = "root";
- private String pwd = "password";
- private String instance = "stratus";
- private String zk = "10.40.190.113:2181";
- private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
- private String format = RDFFormat.RDFXML.getName();
-
-
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new RdfFileInputTool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- //faster
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-
- zk = conf.get(CB_ZK_PROP, zk);
- instance = conf.get(CB_INSTANCE_PROP, instance);
- userName = conf.get(CB_USERNAME_PROP, userName);
- pwd = conf.get(CB_PWD_PROP, pwd);
-
- tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, tablePrefix);
- format = conf.get(FORMAT_PROP, format);
- conf.set(FORMAT_PROP, format);
-
- Job job = new Job(conf);
- job.setJarByClass(RdfFileInputTool.class);
-
- // set up cloudbase input
- job.setInputFormatClass(RdfFileInputFormat.class);
- RdfFileInputFormat.addInputPath(job, new Path(args[0]));
-
- // set input output of the particular job
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(RyaStatementWritable.class);
-// job.setOutputKeyClass(LongWritable.class);
-// job.setOutputValueClass(StatementWritable.class);
-
- job.setOutputFormatClass(CloudbaseOutputFormat.class);
- CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-
- // set mapper and reducer classes
- job.setMapperClass(StatementToMutationMapper.class);
- job.setNumReduceTasks(0);
-// job.setReducerClass(Reducer.class);
-
- // set output
-// Path outputDir = new Path("/temp/sparql-out/testout");
-// FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
-// if (dfs.exists(outputDir))
-// dfs.deleteMutation(outputDir, true);
-//
-// FileOutputFormat.setOutputPath(job, outputDir);
-
- // Submit the job
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
- if (exitCode == 0) {
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
- return job
- .getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS").getValue();
- } else {
- System.out.println("Job Failed!!!");
- }
-
- return -1;
- }
-
- @Override
- public int run(String[] args) throws Exception {
- runJob(args);
- return 0;
- }
-
- public static class StatementToMutationMapper extends Mapper<LongWritable, RyaStatementWritable, Text, Mutation> {
- protected String tablePrefix;
- protected Text spo_table;
- protected Text po_table;
- protected Text osp_table;
- private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression();
- RyaTableMutationsFactory mut = new RyaTableMutationsFactory();
-
- public StatementToMutationMapper() {
- }
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- Configuration conf = context.getConfiguration();
- tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
- spo_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
- po_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
- osp_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
-
- final String cv_s = conf.get(CB_CV_PROP);
- if (cv_s != null)
- cv = cv_s.getBytes();
- }
-
- @Override
- protected void map(LongWritable key, RyaStatementWritable value, Context context) throws IOException, InterruptedException {
- RyaStatement statement = value.getRyaStatement();
- if (statement.getColumnVisibility() == null) {
- statement.setColumnVisibility(cv);
- }
- Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap =
- mut.serialize(statement);
- Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
- Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
- Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-
- for (Mutation m : spo) {
- context.write(spo_table, m);
- }
- for (Mutation m : po) {
- context.write(po_table, m);
- }
- for (Mutation m : osp) {
- context.write(osp_table, m);
- }
- }
-
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
deleted file mode 100644
index 5d7d971..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
+++ /dev/null
@@ -1,314 +0,0 @@
-//package mvm.rya.cloudbase.mr.fileinput;
-//
-//import cloudbase.core.client.Connector;
-//import cloudbase.core.client.ZooKeeperInstance;
-//import cloudbase.core.client.admin.TableOperations;
-//import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat;
-//import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
-//import cloudbase.core.data.Key;
-//import cloudbase.core.data.Value;
-//import cloudbase.core.util.TextUtil;
-//import com.google.common.base.Preconditions;
-//import mvm.rya.api.RdfCloudTripleStoreConstants;
-//import mvm.rya.cloudbase.CloudbaseRdfConstants;
-//import mvm.rya.cloudbase.RyaTableKeyValues;
-//import mvm.rya.cloudbase.mr.utils.MRUtils;
-//import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner;
-//import mvm.rya.cloudbase.utils.shard.HashAlgorithm;
-//import mvm.rya.cloudbase.utils.shard.HashCodeHashAlgorithm;
-//import org.apache.commons.codec.binary.Base64;
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.conf.Configured;
-//import org.apache.hadoop.fs.FileSystem;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Job;
-//import org.apache.hadoop.mapreduce.Mapper;
-//import org.apache.hadoop.mapreduce.Reducer;
-//import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-//import org.apache.hadoop.util.Tool;
-//import org.apache.hadoop.util.ToolRunner;
-//import org.openrdf.model.Resource;
-//import org.openrdf.model.Statement;
-//import org.openrdf.rio.*;
-//
-//import java.io.BufferedOutputStream;
-//import java.io.IOException;
-//import java.io.PrintStream;
-//import java.io.StringReader;
-//import java.util.ArrayList;
-//import java.util.Collection;
-//import java.util.Map;
-//
-//import static com.google.common.base.Preconditions.checkNotNull;
-//
-///**
-//* Take large ntrips files and use MapReduce and Cloudbase
-//* Bulk ingest techniques to load into the table in our partition format.
-//* Uses a sharded scheme
-//* <p/>
-//* Input: NTrips file
-//* Map:
-//* - key : shard row - Text
-//* - value : stmt in doc triple format - Text
-//* Partitioner: RangePartitioner
-//* Reduce:
-//* - key : all the entries for each triple - Cloudbase Key
-//* Class BulkNtripsInputTool
-//* Date: Sep 13, 2011
-//* Time: 10:00:17 AM
-//*/
-//public class ShardedBulkNtripsInputTool extends Configured implements Tool {
-//
-// public static final String WORKDIR_PROP = "bulk.n3.workdir";
-// public static final String BULK_N3_NUMSHARD = "bulk.n3.numshard";
-//
-// private String userName = "root";
-// private String pwd = "password";
-// private String instance = "stratus";
-// private String zk = "10.40.190.129:2181";
-// private String ttl = null;
-// private String workDirBase = "/temp/bulkcb/work";
-// private String format = RDFFormat.NTRIPLES.getName();
-// private int numShards;
-//
-// @Override
-// public int run(final String[] args) throws Exception {
-// final Configuration conf = getConf();
-// try {
-// //conf
-// zk = conf.get(MRUtils.CB_ZK_PROP, zk);
-// ttl = conf.get(MRUtils.CB_TTL_PROP, ttl);
-// instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
-// userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
-// pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
-// workDirBase = conf.get(WORKDIR_PROP, workDirBase);
-// format = conf.get(MRUtils.FORMAT_PROP, format);
-// String numShards_s = conf.get(BULK_N3_NUMSHARD);
-// Preconditions.checkArgument(numShards_s != null);
-// numShards = Integer.parseInt(numShards_s);
-// conf.set(MRUtils.FORMAT_PROP, format);
-// final String inputDir = args[0];
-//
-// ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk);
-// Connector connector = zooKeeperInstance.getConnector(userName, pwd);
-// TableOperations tableOperations = connector.tableOperations();
-//
-// String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-// if (tablePrefix != null)
-// RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-// String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
-// tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
-// tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX};
-// Collection<Job> jobs = new ArrayList<Job>();
-// for (final String table : tables) {
-// for (int i = 0; i < numShards; i++) {
-// final String tableName = table + i;
-// PrintStream out = null;
-// try {
-// String workDir = workDirBase + "/" + tableName;
-// System.out.println("Loading data into table[" + tableName + "]");
-//
-// Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]");
-// job.setJarByClass(this.getClass());
-// //setting long job
-// job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
-// job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
-// job.getConfiguration().set("io.sort.mb", "256");
-// job.getConfiguration().setBoolean("mapred.compress.map.output", true);
-// job.getConfiguration().set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression
-//
-// job.setInputFormatClass(TextInputFormat.class);
-//
-// job.setMapperClass(ShardedParseNtripsMapper.class);
-// job.setMapOutputKeyClass(Key.class);
-// job.setMapOutputValueClass(Value.class);
-//
-// job.setCombinerClass(OutStmtMutationsReducer.class);
-// job.setReducerClass(OutStmtMutationsReducer.class);
-// job.setOutputFormatClass(CloudbaseFileOutputFormat.class);
-// CloudbaseFileOutputFormat.setZooKeeperInstance(job, instance, zk);
-//
-// job.getConfiguration().set(ShardedParseNtripsMapper.TABLE_PROPERTY, tableName);
-// job.getConfiguration().set(ShardedParseNtripsMapper.SHARD_PROPERTY, i + "");
-//
-// TextInputFormat.setInputPaths(job, new Path(inputDir));
-//
-// FileSystem fs = FileSystem.get(conf);
-// Path workPath = new Path(workDir);
-// if (fs.exists(workPath))
-// fs.deleteMutation(workPath, true);
-//
-// CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files"));
-//
-// out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
-//
-// if (!tableOperations.exists(tableName))
-// tableOperations.create(tableName);
-// Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE);
-// for (Text split : splits)
-// out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split))));
-//
-// job.setNumReduceTasks(splits.size() + 1);
-// out.close();
-//
-// job.setPartitionerClass(KeyRangePartitioner.class);
-// RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
-//
-// job.getConfiguration().set(WORKDIR_PROP, workDir);
-//
-// job.submit();
-// jobs.add(job);
-//
-// } catch (Exception re) {
-// throw new RuntimeException(re);
-// } finally {
-// if (out != null)
-// out.close();
-// }
-// }
-// }
-//
-// for (Job job : jobs) {
-// while (!job.isComplete()) {
-// Thread.sleep(1000);
-// }
-// }
-//
-// for (String table : tables) {
-// for (int i = 0; i < numShards; i++) {
-// final String tableName = table + i;
-// String workDir = workDirBase + "/" + tableName;
-// tableOperations.importDirectory(
-// tableName,
-// workDir + "/files",
-// workDir + "/failures",
-// 20,
-// 4,
-// false);
-// }
-// }
-//
-// } catch (Exception e) {
-// throw new RuntimeException(e);
-// }
-//
-// return 0;
-// }
-//
-// public static void main(String[] args) {
-// try {
-// ToolRunner.run(new Configuration(), new ShardedBulkNtripsInputTool(), args);
-// } catch (Exception e) {
-// e.printStackTrace();
-// }
-// }
-//
-// /**
-// * input: ntrips format triple
-// * <p/>
-// * output: key: shard row from generator
-// * value: stmt in serialized format (document format)
-// */
-// public static class ShardedParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> {
-// public static final String TABLE_PROPERTY = "shardedparsentripsmapper.table";
-// public static final String SHARD_PROPERTY = "shardedparsentripsmapper.shard";
-//
-// private RDFParser parser;
-// private String rdfFormat;
-// private HashAlgorithm hashAlgorithm = new HashCodeHashAlgorithm();
-// private int shard;
-// private int numShards;
-//
-// @Override
-// protected void setup(final Context context) throws IOException, InterruptedException {
-// super.setup(context);
-// Configuration conf = context.getConfiguration();
-// final String table = conf.get(TABLE_PROPERTY);
-// Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job");
-//
-// String shard_s = conf.get(SHARD_PROPERTY);
-// Preconditions.checkNotNull(shard_s, "Set the " + SHARD_PROPERTY + " property");
-// shard = Integer.parseInt(shard_s);
-//
-// numShards = Integer.parseInt(conf.get(BULK_N3_NUMSHARD));
-//
-// final String cv_s = conf.get(MRUtils.CB_CV_PROP);
-// rdfFormat = conf.get(MRUtils.FORMAT_PROP);
-// checkNotNull(rdfFormat, "Rdf format cannot be null");
-//
-// parser = Rio.createParser(RDFFormat.valueOf(rdfFormat));
-// parser.setRDFHandler(new RDFHandler() {
-//
-// @Override
-// public void startRDF() throws RDFHandlerException {
-//
-// }
-//
-// @Override
-// public void endRDF() throws RDFHandlerException {
-//
-// }
-//
-// @Override
-// public void handleNamespace(String s, String s1) throws RDFHandlerException {
-//
-// }
-//
-// @Override
-// public void handleStatement(Statement statement) throws RDFHandlerException {
-// try {
-// Resource subject = statement.getSubject();
-// if ((hashAlgorithm.hash(subject.stringValue()) % numShards) != shard) {
-// return;
-// }
-// RyaTableKeyValues rdfTableKeyValues = new RyaTableKeyValues(subject, statement.getPredicate(), statement.getObject(), cv_s, statement.getContext()).invoke();
-// Collection<Map.Entry<Key, Value>> entries = null;
-// if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) {
-// entries = rdfTableKeyValues.getSpo();
-// } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) {
-// entries = rdfTableKeyValues.getPo();
-// } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) {
-// entries = rdfTableKeyValues.getOsp();
-// } else
-// throw new IllegalArgumentException("Unrecognized table[" + table + "]");
-//
-// for (Map.Entry<Key, Value> entry : entries) {
-// context.write(entry.getKey(), entry.getValue());
-// }
-// } catch (Exception e) {
-// throw new RDFHandlerException(e);
-// }
-// }
-//
-// @Override
-// public void handleComment(String s) throws RDFHandlerException {
-//
-// }
-// });
-// }
-//
-// @Override
-// public void map(LongWritable key, Text value, Context output)
-// throws IOException, InterruptedException {
-// String rdf = value.toString();
-// try {
-// parser.parse(new StringReader(rdf), "");
-// } catch (RDFParseException e) {
-// System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]");
-// } catch (Exception e) {
-// e.printStackTrace();
-// throw new IOException("Exception occurred parsing triple[" + rdf + "]");
-// }
-// }
-// }
-//
-// public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> {
-//
-// public void reduce(Key key, Iterable<Value> values, Context output)
-// throws IOException, InterruptedException {
-// output.write(key, CloudbaseRdfConstants.EMPTY_VALUE);
-// }
-// }
-//}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
deleted file mode 100644
index 453d6ca..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
+++ /dev/null
@@ -1,350 +0,0 @@
-//package mvm.rya.cloudbase.mr.upgrade;
-//
-//import cloudbase.core.client.Connector;
-//import cloudbase.core.client.ZooKeeperInstance;
-//import cloudbase.core.client.admin.TableOperations;
-//import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
-//import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-//import cloudbase.core.data.Key;
-//import cloudbase.core.data.Mutation;
-//import cloudbase.core.data.Range;
-//import cloudbase.core.data.Value;
-//import cloudbase.core.security.Authorizations;
-//import cloudbase.core.security.ColumnVisibility;
-//import cloudbase.core.util.Pair;
-//import com.google.common.collect.Lists;
-//import com.google.common.io.ByteArrayDataInput;
-//import com.google.common.io.ByteArrayDataOutput;
-//import com.google.common.io.ByteStreams;
-//import mvm.rya.api.InvalidValueTypeMarkerRuntimeException;
-//import mvm.rya.api.RdfCloudTripleStoreConstants;
-//import mvm.rya.cloudbase.CloudbaseRdfConfiguration;
-//import mvm.rya.cloudbase.CloudbaseRdfConstants;
-//import mvm.rya.cloudbase.CloudbaseRyaDAO;
-//import mvm.rya.cloudbase.RyaTableMutationsFactory;
-//import mvm.rya.cloudbase.mr.utils.MRUtils;
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.conf.Configured;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Job;
-//import org.apache.hadoop.mapreduce.Mapper;
-//import org.apache.hadoop.util.Tool;
-//import org.apache.hadoop.util.ToolRunner;
-//import org.openrdf.model.*;
-//import org.openrdf.model.impl.StatementImpl;
-//import org.openrdf.model.impl.ValueFactoryImpl;
-//
-//import java.io.IOException;
-//import java.util.ArrayList;
-//import java.util.Collection;
-//import java.util.Date;
-//import java.util.Map;
-//
-//import static mvm.rya.api.RdfCloudTripleStoreUtils.*;
-//
-///**
-// * 1. Check version. <br/>
-// * 2. If version does not exist, apply: <br/>
-// * - DELIM => 1 -> 0
-// * - DELIM_STOP => 2 -> 1
-// * - 3 table index
-// */
-//public class UpgradeCloudbaseRdfTables extends Configured implements Tool {
-// public static final String TMP = "_tmp";
-// public static final String DELETE_PROP = "rdf.upgrade.deleteMutation"; //true if ok to deleteMutation old tables
-// private String zk = "10.40.190.113:2181";
-// private String instance = "stratus";
-// private String userName = "root";
-// private String pwd = "password";
-// private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-// private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration();
-//
-// @Override
-// public int run(String[] strings) throws Exception {
-// conf = new CloudbaseRdfConfiguration(getConf());
-// //faster
-// conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-// conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-// conf.set(MRUtils.JOB_NAME_PROP, "Upgrading Cloudbase Rdf Tables");
-//
-// zk = conf.get(MRUtils.CB_ZK_PROP, zk);
-// instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
-// userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
-// pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
-//
-// tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
-//
-// Authorizations authorizations = CloudbaseRdfConstants.ALL_AUTHORIZATIONS;
-// String auth = conf.get(MRUtils.CB_AUTH_PROP);
-// if (auth != null)
-// authorizations = new Authorizations(auth.split(","));
-//
-// boolean deleteTables = conf.getBoolean(DELETE_PROP, false);
-//
-// //tables
-// String spo = tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX;
-// String po = tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX;
-// String osp = tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX;
-// String so = tablePrefix + "so";
-// String ops = tablePrefix + "o";
-//
-// //check version first
-// Connector connector = new ZooKeeperInstance(instance, zk).getConnector(userName, pwd.getBytes());
-// CloudbaseRyaDAO rdfDAO = new CloudbaseRyaDAO();
-// rdfDAO.setConnector(connector);
-// conf.setTablePrefix(tablePrefix);
-// rdfDAO.setConf(conf);
-//// rdfDAO.setSpoTable(spo);
-//// rdfDAO.setPoTable(po);
-//// rdfDAO.setOspTable(osp);
-//// rdfDAO.setNamespaceTable(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
-// rdfDAO.init();
-// String version = rdfDAO.getVersion();
-// if (version != null) {
-// //TODO: Do a version check here
-// //version found, no need to upgrade
-// return 0;
-// }
-//
-// rdfDAO.destroy();
-//
-// //create osp table, deleteMutation so and o tables
-// TableOperations tableOperations = connector.tableOperations();
-// if (deleteTables) {
-// if (tableOperations.exists(so)) {
-// tableOperations.deleteMutation(so);
-// }
-// if (tableOperations.exists(ops)) {
-// tableOperations.deleteMutation(ops);
-// }
-// }
-//
-// conf.set("io.sort.mb", "256");
-// Job job = new Job(conf);
-// job.setJarByClass(UpgradeCloudbaseRdfTables.class);
-//
-// //set up cloudbase input
-// job.setInputFormatClass(CloudbaseInputFormat.class);
-// CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), spo, authorizations);
-// CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
-// Collection<Pair<Text, Text>> columns = new ArrayList<Pair<Text, Text>>();
-// final Pair pair = new Pair(RdfCloudTripleStoreConstants.INFO_TXT, RdfCloudTripleStoreConstants.INFO_TXT);
-// columns.add(pair);
-// CloudbaseInputFormat.fetchColumns(job, columns);
-//
-// CloudbaseInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE}))));
-//
-// // set input output of the particular job
-// job.setMapOutputKeyClass(Text.class);
-// job.setMapOutputValueClass(Mutation.class);
-//
-// //no reducer needed?
-// job.setNumReduceTasks(0);
-// job.setMapperClass(UpgradeCloudbaseRdfTablesMapper.class);
-//
-// CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, spo + TMP);
-// CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-// job.setOutputFormatClass(CloudbaseOutputFormat.class);
-//
-// // Submit the job
-// Date startTime = new Date();
-// System.out.println("Job started: " + startTime);
-// int exitCode = job.waitForCompletion(true) ? 0 : 1;
-//
-// if (exitCode == 0) {
-// Date end_time = new Date();
-// System.out.println("Job ended: " + end_time);
-// System.out.println("The job took "
-// + (end_time.getTime() - startTime.getTime()) / 1000
-// + " seconds.");
-//
-// //now deleteMutation old spo table, and rename tmp one
-// if (deleteTables) {
-// tableOperations.deleteMutation(spo);
-// tableOperations.rename(spo + TMP, spo);
-// tableOperations.deleteMutation(po);
-// tableOperations.rename(po + TMP, po);
-// tableOperations.deleteMutation(osp);
-// tableOperations.rename(osp + TMP, osp);
-// }
-//
-// return 0;
-// } else {
-// System.out.println("Job Failed!!!");
-// }
-//
-// return -1;
-// }
-//
-// public static void main(String[] args) {
-// try {
-// ToolRunner.run(new Configuration(), new UpgradeCloudbaseRdfTables(), args);
-// } catch (Exception e) {
-// e.printStackTrace();
-// }
-// }
-//
-// public static class UpgradeCloudbaseRdfTablesMapper extends Mapper<Key, Value, Text, Mutation> {
-// private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-// ValueFactoryImpl vf = new ValueFactoryImpl();
-//
-// private Text spo_table, po_table, osp_table;
-//
-// RyaTableMutationsFactory mut = new RyaTableMutationsFactory();
-//
-// @Override
-// protected void setup(Context context) throws IOException, InterruptedException {
-// super.setup(context);
-// Configuration conf = context.getConfiguration();
-// tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
-// String spo = tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX + TMP;
-// String po = tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX + TMP;
-// String osp = tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX + TMP;
-//
-// spo_table = new Text(spo);
-// po_table = new Text(po);
-// osp_table = new Text(osp);
-// }
-//
-// @Override
-// protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
-// //read in old format
-// Statement statement = null;
-// try {
-// statement = translateOldStatementFromRow(ByteStreams.newDataInput(key.getRow().getBytes()), "spo", vf);
-// } catch (Exception e) {
-// //not the right version
-// return;
-// }
-//
-// //translate to new format and save in new tables
-// Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Mutation> mutationMap = mut.serialize(statement.getSubject(), statement.getPredicate(), statement.getObject(), new ColumnVisibility(key.getColumnVisibility()), statement.getContext());
-// Mutation spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-// Mutation po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
-// Mutation osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-//
-// context.write(spo_table, spo);
-// context.write(po_table, po);
-// context.write(osp_table, osp);
-//
-// //TODO: Contexts
-// }
-// }
-//
-// public static org.openrdf.model.Value readOldValue(ByteArrayDataInput dataIn, ValueFactory vf)
-// throws IOException, ClassCastException {
-// int valueTypeMarker;
-// try {
-// valueTypeMarker = dataIn.readByte();
-// } catch (Exception e) {
-// return null;
-// }
-//
-// org.openrdf.model.Value ret = null;
-// if (valueTypeMarker == RdfCloudTripleStoreConstants.URI_MARKER) {
-// String uriString = readString(dataIn);
-// ret = vf.createURI(uriString);
-// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.BNODE_MARKER) {
-// String bnodeID = readString(dataIn);
-// ret = vf.createBNode(bnodeID);
-// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER) {
-// String label = readString(dataIn);
-// ret = vf.createLiteral(label);
-// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER) {
-// String label = readString(dataIn);
-// String language = readString(dataIn);
-// ret = vf.createLiteral(label, language);
-// } else if (valueTypeMarker == RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER) {
-// String label = readString(dataIn);
-// URI datatype = (URI) readOldValue(dataIn, vf);
-// ret = vf.createLiteral(label, datatype);
-// } else {
-// throw new InvalidValueTypeMarkerRuntimeException(valueTypeMarker, "Invalid value type marker: "
-// + valueTypeMarker);
-// }
-//
-// return ret;
-// }
-//
-// public static Statement translateOldStatementFromRow(ByteArrayDataInput input, String table, ValueFactory vf) throws IOException {
-// Resource subject;
-// URI predicate;
-// org.openrdf.model.Value object;
-// if ("spo".equals(table)) {
-// subject = (Resource) readOldValue(input, vf);
-// input.readByte();
-// predicate = (URI) readOldValue(input, vf);
-// input.readByte();
-// object = readOldValue(input, vf);
-// } else if ("o".equals(table)) {
-// object = readOldValue(input, vf);
-// input.readByte();
-// predicate = (URI) readOldValue(input, vf);
-// input.readByte();
-// subject = (Resource) readOldValue(input, vf);
-// } else if ("po".equals(table)) {
-// predicate = (URI) readOldValue(input, vf);
-// input.readByte();
-// object = readOldValue(input, vf);
-// input.readByte();
-// subject = (Resource) readOldValue(input, vf);
-// } else {
-// //so
-// subject = (Resource) readOldValue(input, vf);
-// input.readByte();
-// object = readOldValue(input, vf);
-// input.readByte();
-// predicate = (URI) readOldValue(input, vf);
-// }
-// return new StatementImpl(subject, predicate, object);
-// }
-//
-// public static byte[] writeOldValue(org.openrdf.model.Value value) throws IOException {
-// if (value == null)
-// return new byte[]{};
-// ByteArrayDataOutput dataOut = ByteStreams.newDataOutput();
-// if (value instanceof URI) {
-// dataOut.writeByte(RdfCloudTripleStoreConstants.URI_MARKER);
-// writeString(((URI) value).toString(), dataOut);
-// } else if (value instanceof BNode) {
-// dataOut.writeByte(RdfCloudTripleStoreConstants.BNODE_MARKER);
-// writeString(((BNode) value).getID(), dataOut);
-// } else if (value instanceof Literal) {
-// Literal lit = (Literal) value;
-//
-// String label = lit.getLabel();
-// String language = lit.getLanguage();
-// URI datatype = lit.getDatatype();
-//
-// if (datatype != null) {
-// dataOut.writeByte(RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER);
-// writeString(label, dataOut);
-// dataOut.write(writeOldValue(datatype));
-// } else if (language != null) {
-// dataOut.writeByte(RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER);
-// writeString(label, dataOut);
-// writeString(language, dataOut);
-// } else {
-// dataOut.writeByte(RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER);
-// writeString(label, dataOut);
-// }
-// } else {
-// throw new IllegalArgumentException("unexpected value type: "
-// + value.getClass());
-// }
-// return dataOut.toByteArray();
-// }
-//
-// private static String OLD_DELIM = "\u0001";
-// private static byte[] OLD_DELIM_BYTES = OLD_DELIM.getBytes();
-//
-// public static byte[] buildOldRowWith(byte[] bytes_one, byte[] bytes_two, byte[] bytes_three) throws IOException {
-// ByteArrayDataOutput rowidout = ByteStreams.newDataOutput();
-// rowidout.write(bytes_one);
-// rowidout.write(OLD_DELIM_BYTES);
-// rowidout.write(bytes_two);
-// rowidout.write(OLD_DELIM_BYTES);
-// rowidout.write(bytes_three);
-// return truncateRowId(rowidout.toByteArray());
-// }
-//}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java
deleted file mode 100644
index 950f585..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package mvm.rya.cloudbase.mr.utils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.openrdf.model.URI;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-/**
- * Class MRSailUtils
- * Date: May 19, 2011
- * Time: 10:34:06 AM
- */
-public class MRUtils {
-
- public static final String JOB_NAME_PROP = "mapred.job.name";
-
- public static final String CB_USERNAME_PROP = "cb.username";
- public static final String CB_PWD_PROP = "cb.pwd";
- public static final String CB_ZK_PROP = "cb.zk";
- public static final String CB_INSTANCE_PROP = "cb.instance";
- public static final String CB_TTL_PROP = "cb.ttl";
- public static final String CB_CV_PROP = "cb.cv";
- public static final String CB_AUTH_PROP = "cb.auth";
- public static final String CB_MOCK_PROP = "cb.mock";
- public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout";
- public static final String FORMAT_PROP = "rdf.format";
-
- public static final String NAMED_GRAPH_PROP = "rdf.graph";
-
- public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix";
-
- // rdf constants
- public static final ValueFactory vf = new ValueFactoryImpl();
- public static final URI RDF_TYPE = vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type");
-
-
- // cloudbase map reduce utils
-
-// public static Range retrieveRange(URI entry_key, URI entry_val) throws IOException {
-// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput();
-// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_key));
-// if (entry_val != null) {
-// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_BYTES);
-// startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_val));
-// }
-// byte[] startrow = startRowOut.toByteArray();
-// startRowOut.write(RdfCloudTripleStoreConstants.DELIM_STOP_BYTES);
-// byte[] stoprow = startRowOut.toByteArray();
-//
-// Range range = new Range(new Text(startrow), new Text(stoprow));
-// return range;
-// }
-
-
- public static String getCBTtl(Configuration conf) {
- return conf.get(CB_TTL_PROP);
- }
-
- public static String getCBUserName(Configuration conf) {
- return conf.get(CB_USERNAME_PROP);
- }
-
- public static String getCBPwd(Configuration conf) {
- return conf.get(CB_PWD_PROP);
- }
-
- public static String getCBZK(Configuration conf) {
- return conf.get(CB_ZK_PROP);
- }
-
- public static String getCBInstance(Configuration conf) {
- return conf.get(CB_INSTANCE_PROP);
- }
-
- public static void setCBUserName(Configuration conf, String str) {
- conf.set(CB_USERNAME_PROP, str);
- }
-
- public static void setCBPwd(Configuration conf, String str) {
- conf.set(CB_PWD_PROP, str);
- }
-
- public static void setCBZK(Configuration conf, String str) {
- conf.set(CB_ZK_PROP, str);
- }
-
- public static void setCBInstance(Configuration conf, String str) {
- conf.set(CB_INSTANCE_PROP, str);
- }
-
- public static void setCBTtl(Configuration conf, String str) {
- conf.set(CB_TTL_PROP, str);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
deleted file mode 100644
index d3f8ae7..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package mvm.rya.cloudbase.query;
-
-import cloudbase.core.client.BatchScanner;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Value;
-import com.google.common.base.Preconditions;
-import mango.collect.AbstractCloseableIterable;
-import mvm.rya.cloudbase.BatchScannerIterator;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- */
-public class BatchScannerCloseableIterable extends AbstractCloseableIterable<Map.Entry<Key, Value>> {
-
- private BatchScanner scanner;
-
- public BatchScannerCloseableIterable(BatchScanner scanner) {
- Preconditions.checkNotNull(scanner);
- this.scanner = scanner;
- }
-
- @Override
- protected void doClose() throws IOException {
- scanner.close();
- }
-
- @Override
- protected Iterator<Map.Entry<Key, Value>> retrieveIterator() {
- return new BatchScannerIterator(scanner.iterator());
- }
-}