You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by pu...@apache.org on 2015/12/04 17:46:36 UTC
[24/49] incubator-rya git commit: RYA-7 POM and License Clean-up for
Apache Move
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/assembly/job.xml b/partition/mr.partition.rdf/src/main/assembly/job.xml
deleted file mode 100644
index 259b917..0000000
--- a/partition/mr.partition.rdf/src/main/assembly/job.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<assembly
- xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
- <id>job</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <dependencySets>
- <dependencySet>
- <unpack>false</unpack>
- <scope>runtime</scope>
- <outputDirectory>lib</outputDirectory>
- <excludes>
- <exclude>org.apache.hadoop:hadoop-core</exclude>
- <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
- </excludes>
- </dependencySet>
- <dependencySet>
- <unpack>false</unpack>
- <scope>system</scope>
- <outputDirectory>lib</outputDirectory>
- <excludes>
- <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
- </excludes>
- </dependencySet>
- </dependencySets>
- <fileSets>
- <fileSet>
- <directory>${basedir}/target/classes</directory>
- <outputDirectory>/</outputDirectory>
- <excludes>
- <exclude>*.jar</exclude>
- </excludes>
- </fileSet>
- </fileSets>
-</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy b/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy
deleted file mode 100644
index e5e02ec..0000000
--- a/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy
+++ /dev/null
@@ -1,33 +0,0 @@
-import org.openrdf.rio.rdfxml.*
-import org.openrdf.rio.ntriples.NTriplesWriterFactory
-import org.openrdf.rio.RDFHandler
-
-@Grab(group='com.google.guava', module='guava', version='r06')
-@Grab(group='org.openrdf.sesame', module='sesame-rio-rdfxml', version='2.3.2')
-@Grab(group='org.openrdf.sesame', module='sesame-rio-ntriples', version='2.3.2')
-@Grab(group='org.slf4j', module='slf4j-simple', version='1.5.8')
-def convertDirRdfFormat(def dir, def outputFile) {
- //read each file
- assert dir.isDirectory()
-
- def ntriplesWriter = NTriplesWriterFactory.newInstance().getWriter(new FileOutputStream(outputFile))
-
- ntriplesWriter.startRDF()
- dir.listFiles().each { it ->
- //load file into rdfxml parser
- def rdfxmlParser = RDFXMLParserFactory.newInstance().getParser()
- rdfxmlParser.setRDFHandler(
- [ startRDF: {},
- endRDF: {},
- handleNamespace: { def prefix, def uri -> ntriplesWriter.handleNamespace(prefix, uri)},
- handleComment: {},
- handleStatement: { def stmt -> ntriplesWriter.handleStatement stmt}] as RDFHandler
- )
- rdfxmlParser.parse(new FileInputStream(it), "")
- }
- ntriplesWriter.endRDF()
-}
-
-try{
-convertDirRdfFormat(new File(args[0]), new File(args[1]))
-}catch(Exception e) {e.printStackTrace();}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java
deleted file mode 100644
index e8b2e5a..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package mvm.mmrts.rdf.partition.mr;
-
-import com.google.common.io.ByteStreams;
-import mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFJob;
-
-import java.io.FileInputStream;
-
-/**
- * Class MrTstBed
- * Date: Sep 1, 2011
- * Time: 9:18:53 AM
- */
-public class MrTstBed {
- public static void main(String[] args) {
- try {
-// String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" +
-// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
-// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
-// "SELECT * WHERE\n" +
-// "{\n" +
-// "?id tdp:reportedAt ?timestamp. \n" +
-// "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314898074000 , 1314898374000 , 'XMLDATETIME')).\n" +
-// "?id tdp:performedBy ?system.\n" +
-// "?id <http://here/2010/cmv/ns#hasMarkingText> \"U\".\n" +
-// "?id rdf:type tdp:Sent.\n" +
-// "} \n";
-
- FileInputStream fis = new FileInputStream(args[0]);
- String query = new String(ByteStreams.toByteArray(fis));
- fis.close();
-
-// String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" +
-// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
-// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
-// "SELECT * WHERE\n" +
-// "{\n" +
-// "?id tdp:reportedAt ?timestamp.\n" +
-// "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314381770000 , 1314381880000 , 'XMLDATETIME')).\n" +
-// "?id tdp:performedBy ?system.\n" +
-// "}";
-
- new SparqlCloudbaseIFJob("partitionRdf", "root", "password", "stratus", "stratus13:2181", "/temp/queryout", MrTstBed.class, query).run();
-
-// QueryParser parser = (new SPARQLParserFactory()).getParser();
-// TupleExpr expr = parser.parseQuery(query, "http://www.w3.org/1999/02/22-rdf-syntax-ns#").getTupleExpr();
-// System.out.println(expr);
-//
-// final Configuration queryConf = new Configuration();
-// expr.visit(new FilterTimeIndexVisitor(queryConf));
-//
-// (new SubjectGroupingOptimizer(queryConf)).optimize(expr, null, null);
-//
-// System.out.println(expr);
-//
-// //make sure of only one shardlookup
-// expr.visit(new QueryModelVisitorBase<RuntimeException>() {
-// int count = 0;
-//
-// @Override
-// public void meetOther(QueryModelNode node) throws RuntimeException {
-// super.meetOther(node);
-// count++;
-// if (count > 1)
-// throw new IllegalArgumentException("Query can only have one subject-star lookup");
-// }
-// });
-//
-// final Job job = new Job(queryConf);
-// job.setJarByClass(MrTstBed.class);
-//
-// expr.visit(new QueryModelVisitorBase<RuntimeException>() {
-// @Override
-// public void meetOther(QueryModelNode node) throws RuntimeException {
-// super.meetOther(node);
-//
-// //set up CloudbaseBatchScannerInputFormat here
-// if (node instanceof ShardSubjectLookup) {
-// System.out.println("Lookup: " + node);
-// try {
-// new SparqlCloudbaseIFTransformer((ShardSubjectLookup) node, queryConf, job, "partitionRdf",
-// "root", "password", "stratus", "stratus13:2181");
-// } catch (QueryEvaluationException e) {
-// e.printStackTrace();
-// }
-// }
-// }
-// });
-//
-// Path outputDir = new Path("/temp/sparql-out/testout");
-// FileSystem dfs = FileSystem.get(outputDir.toUri(), queryConf);
-// if (dfs.exists(outputDir))
-// dfs.delete(outputDir, true);
-//
-// FileOutputFormat.setOutputPath(job, outputDir);
-//
-// // Submit the job
-// Date startTime = new Date();
-// System.out.println("Job started: " + startTime);
-// job.waitForCompletion(true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java
deleted file mode 100644
index 15c9c79..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java
+++ /dev/null
@@ -1,411 +0,0 @@
-package mvm.mmrts.rdf.partition.mr;
-
-import cloudbase.core.client.ZooKeeperInstance;
-import cloudbase.core.util.ArgumentChecker;
-import mvm.mmrts.rdf.partition.PartitionSail;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.*;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.*;
-import org.openrdf.repository.Repository;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.repository.sail.SailRepository;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static mvm.mmrts.rdf.partition.PartitionConstants.*;
-
-/**
- * Class SparqlPartitionStoreInputFormat
- * Date: Oct 28, 2010
- * Time: 11:48:17 AM
- */
-public class SparqlPartitionStoreInputFormat extends InputFormat<LongWritable, MapWritable> {
-
- public static final String PREFIX = "mvm.mmrts.rdf.partition.mr.sparqlinputformat";
- public static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
- public static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
- public static final String USERNAME = PREFIX + ".username";
- public static final String PASSWORD = PREFIX + ".password";
-
- public static final String INSTANCE_NAME = PREFIX + ".instanceName";
- public static final String ZK = PREFIX + ".zk";
-
- public static final String STARTTIME = PREFIX + ".starttime";
- public static final String ENDTIME = PREFIX + ".endtime";
- public static final String TABLE = PREFIX + ".table";
- public static final String SHARD_TABLE = PREFIX + ".shardtable";
- public static final String SPARQL_QUERIES_PROP = PREFIX + ".sparql";
- public static final String MR_NUMTHREADS_PROP = PREFIX + ".numthreads";
-// public static final String RANGE_PROP = PREFIX + ".range";
-// public static final String NUM_RANGES_PROP = PREFIX + ".numranges";
-// public static final String TABLE_PREFIX_PROP = PREFIX + ".tablePrefix";
-// public static final String OFFSET_RANGE_PROP = PREFIX + ".offsetrange";
-
-// public static final String INFER_PROP = PREFIX + ".infer";
-
- private static final String UTF_8 = "UTF-8";
-
- private static final ValueFactory vf = ValueFactoryImpl.getInstance();
-
- static class SparqlInputSplit extends InputSplit implements Writable {
-
- protected String sparql;
- protected String startTime;
- protected String endTime;
- protected String table;
-// private Long offset;
-// private Long limit;
-
- private SparqlInputSplit() {
- }
-
- private SparqlInputSplit(String sparql, String startTime, String endTime, String table) {
- this.sparql = sparql;
- this.startTime = startTime;
- this.endTime = endTime;
- this.table = table;
-// this.offset = offset;
-// this.limit = limit;
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return new String[]{sparql};
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- boolean startTimeExists = startTime != null;
- dataOutput.writeBoolean(startTimeExists);
- if (startTimeExists)
- dataOutput.writeUTF(startTime);
-
- boolean endTimeExists = endTime != null;
- dataOutput.writeBoolean(endTimeExists);
- if (endTimeExists)
- dataOutput.writeUTF(endTime);
-
- dataOutput.writeUTF(table);
- dataOutput.writeUTF(sparql);
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- if (dataInput.readBoolean())
- this.startTime = dataInput.readUTF();
- if (dataInput.readBoolean())
- this.endTime = dataInput.readUTF();
- this.table = dataInput.readUTF();
- this.sparql = dataInput.readUTF();
- }
- }
-
- /**
- * Create a SparqlInputSplit for every sparql query.<br>
- * Separate a single sparql query into numRanges of time ranges. For example,
- * a numRange of 3, with range of 1 day (ms), and 1 query, will have 3 input splits
- * with the same query, however the first range will go from now to a day before, the second
- * will go from the day before to the day before that, the third will go from the two days
- * ago to forever back.
- * <br><br>
- * If the numRanges is not set, or set to 1, the inputsplit can only focus on a certain startTime,
- * ttl. If these are not set, then look at all time.
- *
- * @param job
- * @return
- * @throws java.io.IOException
- * @throws InterruptedException
- */
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
- validateOptions(job.getConfiguration());
- final Collection<String> queries = getSparqlQueries(job.getConfiguration());
- if (queries == null || queries.size() == 0)
- throw new IOException("Queries cannot be null or empty");
-
- String startTime_s = getStartTime(job.getConfiguration());
- String endTime_s = getEndTime(job.getConfiguration());
-
- List<InputSplit> splits = new ArrayList<InputSplit>();
- for (String query : queries) {
- splits.add(new SparqlInputSplit(query, startTime_s, endTime_s, getTable(job.getConfiguration())));
- }
- return splits;
- }
-
- @Override
- public RecordReader<LongWritable, MapWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
- return new SparqlResultsRecordReader(taskAttemptContext.getConfiguration());
- }
-
- protected static String getUsername(Configuration conf) {
- return conf.get(USERNAME);
- }
-
- /**
- * WARNING: The password is stored in the Configuration and shared with all
- * MapReduce tasks; It is BASE64 encoded to provide a charset safe
- * conversion to a string, and is not intended to be secure.
- */
- protected static String getPassword(Configuration conf) {
- return new String(Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()));
- }
-
- protected static String getInstance(Configuration conf) {
- return conf.get(INSTANCE_NAME);
- }
-
- public static void setSparqlQueries(JobContext job, String... queries) {
- if (queries == null || queries.length == 0)
- throw new IllegalArgumentException("Queries cannot be null or empty");
-
- final Configuration conf = job.getConfiguration();
- setSparqlQueries(conf, queries);
- }
-
- public static void setSparqlQueries(Configuration conf, String... queries) {
- try {
- Collection<String> qencs = new ArrayList<String>();
- for (String query : queries) {
- final String qenc = URLEncoder.encode(query, UTF_8);
- qencs.add(qenc);
- }
- conf.setStrings(SPARQL_QUERIES_PROP, qencs.toArray(new String[qencs.size()]));
- } catch (UnsupportedEncodingException e) {
- //what to do...
- e.printStackTrace();
- }
- }
-
- public static Collection<String> getSparqlQueries(Configuration conf) {
- Collection<String> queries = new ArrayList<String>();
- final Collection<String> qencs = conf.getStringCollection(SPARQL_QUERIES_PROP);
- for (String qenc : qencs) {
- queries.add(qenc);
- }
- return queries;
- }
-
- public static void setLongJob(JobContext job, Long time) {
- Configuration conf = job.getConfiguration();
- //need to make the runtime longer, default 30 min
- time = (time == null) ? 1800000 : time;
- conf.setLong("mapreduce.tasktracker.healthchecker.script.timeout", time);
- conf.set("mapred.child.java.opts", "-Xmx1G");
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
- }
-
- public static void setInputInfo(JobContext job, String user, byte[] passwd) {
- Configuration conf = job.getConfiguration();
- if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
- throw new IllegalStateException("Input info can only be set once per job");
- conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
-
- ArgumentChecker.notNull(user, passwd);
- conf.set(USERNAME, user);
- conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
- }
-
- public static void setEndTime(JobContext job, String endTime) {
- Configuration conf = job.getConfiguration();
- conf.set(ENDTIME, endTime);
- }
-
- public static String getEndTime(Configuration conf) {
- return conf.get(ENDTIME);
- }
-
- public static void setNumThreads(JobContext job, int numThreads) {
- Configuration conf = job.getConfiguration();
- conf.setInt(MR_NUMTHREADS_PROP, numThreads);
- }
-
- public static int getNumThreads(Configuration conf) {
- return conf.getInt(MR_NUMTHREADS_PROP, -1);
- }
-
- public static void setTable(JobContext job, String table) {
- Configuration conf = job.getConfiguration();
- conf.set(TABLE, table);
- }
-
- public static String getTable(Configuration conf) {
- return conf.get(TABLE);
- }
-
- public static void setShardTable(JobContext job, String table) {
- Configuration conf = job.getConfiguration();
- conf.set(SHARD_TABLE, table);
- }
-
- public static String getShardTable(Configuration conf) {
- String t = conf.get(SHARD_TABLE);
- return (t != null) ? t : getTable(conf);
- }
-
- public static void setStartTime(JobContext job, String startTime) {
- Configuration conf = job.getConfiguration();
- conf.set(STARTTIME, startTime);
- }
-
- public static String getStartTime(Configuration conf) {
- return conf.get(STARTTIME);
- }
-
- public static void setZooKeeperInstance(JobContext job, String instanceName, String zk) {
- Configuration conf = job.getConfiguration();
- if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
- throw new IllegalStateException("Instance info can only be set once per job");
- conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-
- ArgumentChecker.notNull(instanceName, zk);
- conf.set(INSTANCE_NAME, instanceName);
- conf.set(ZK, zk);
- }
-
- protected static void validateOptions(Configuration conf) throws IOException {
- if (!conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
- throw new IOException("Input info has not been set.");
- if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
- throw new IOException("Instance info has not been set.");
- if (conf.getStrings(SPARQL_QUERIES_PROP) == null)
- throw new IOException("Sparql queries have not been set.");
- }
-
- private class SparqlResultsRecordReader extends RecordReader<LongWritable, MapWritable>
-// implements TupleQueryResultWriter, Runnable
- {
-
- boolean closed = false;
- long count = 0;
- BlockingQueue<MapWritable> queue = new LinkedBlockingQueue<MapWritable>();
- private Repository repo;
- String query;
-
- Configuration conf;
- private TupleQueryResult result;
- private RepositoryConnection conn;
-
- public SparqlResultsRecordReader(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-
- try {
- validateOptions(conf);
-
- SparqlInputSplit sis = (SparqlInputSplit) inputSplit;
- this.query = sis.sparql;
-
- // init RdfCloudTripleStore
- final PartitionSail store = new PartitionSail(new ZooKeeperInstance(getInstance(conf),
- conf.get(ZK)).getConnector(getUsername(conf), getPassword(conf).getBytes()), getTable(conf), getShardTable(conf));
-
- repo = new SailRepository(store);
- repo.initialize();
-
- conn = repo.getConnection();
- query = URLDecoder.decode(query, UTF_8);
- TupleQuery tupleQuery = conn.prepareTupleQuery(
- QueryLanguage.SPARQL, query);
-
- if (sis.startTime != null && sis.endTime != null) {
- tupleQuery.setBinding(START_BINDING, vf.createLiteral(sis.startTime));
- tupleQuery.setBinding(END_BINDING, vf.createLiteral(sis.endTime));
- }
-
- int threads = getNumThreads(conf);
- if (threads > 0) {
- tupleQuery.setBinding(NUMTHREADS_PROP, vf.createLiteral(threads));
- }
-
- result = tupleQuery.evaluate();
- } catch (Exception e) {
- throw new IOException("Exception occurred opening Repository", e);
- }
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- try {
- return result.hasNext();
- } catch (QueryEvaluationException e) {
- throw new IOException(e);
- }
-// return false;
- }
-
- @Override
- public LongWritable getCurrentKey() throws IOException, InterruptedException {
- return new LongWritable(count++);
- }
-
- @Override
- public MapWritable getCurrentValue() throws IOException, InterruptedException {
- try {
- if (result.hasNext()) {
- BindingSet bindingSet = result.next();
- return transformRow(bindingSet);
- }
- return null;
- } catch (QueryEvaluationException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return (closed) ? (1) : (0);
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- try {
- conn.close();
- repo.shutDown();
- } catch (RepositoryException e) {
- throw new IOException("Exception occurred closing Repository", e);
- }
- }
-
- MapWritable mw = new MapWritable();
-
- protected MapWritable transformRow(BindingSet bindingSet) {
- mw.clear(); //handle the case of optional bindings. -mbraun
- for (String name : bindingSet.getBindingNames()) {
- final Text key = new Text(name);
- final Text value = new Text(bindingSet.getValue(name).stringValue());
- mw.put(key, value);
- }
- return mw;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
deleted file mode 100644
index 4b369ae..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package mvm.mmrts.rdf.partition.mr;
-
-import com.google.common.io.ByteStreams;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Date;
-
-/**
- * Class SparqlTestDriver
- * Date: Oct 28, 2010
- * Time: 2:53:39 PM
- */
-public class SparqlTestDriver implements Tool {
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new SparqlTestDriver(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private Configuration conf;
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public int run(String[] args) throws IOException, InterruptedException,
- ClassNotFoundException {
-
- //query from file
- if(args.length < 2) {
- throw new IllegalArgumentException("Usage: hadoop jar mvm.mmrts.rdf.partition.mr.SparqlTestDriver <local query file> outputFile");
- }
-
- FileInputStream fis = new FileInputStream(args[0]);
- String query = new String(ByteStreams.toByteArray(fis));
- fis.close();
-
- Job job = new Job(conf);
- job.setJarByClass(SparqlTestDriver.class);
-
- // set up cloudbase input
- job.setInputFormatClass(SparqlPartitionStoreInputFormat.class);
- SparqlPartitionStoreInputFormat.setInputInfo(job, "root", "password".getBytes());
- SparqlPartitionStoreInputFormat.setZooKeeperInstance(job, "stratus", "10.40.190.113:2181");
- SparqlPartitionStoreInputFormat.setLongJob(job, null);
- SparqlPartitionStoreInputFormat.setTable(job, "partitionRdf");
-
- long startTime_l = 1303811164088l;
- long ttl = 86400000;
-
- //set query
-// String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" +
-// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
-// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
-// "SELECT * WHERE\n" +
-// "{\n" +
-// "?id tdp:reportedAt ?timestamp. \n" +
-// "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314380456900 , 1314384056900 , 'XMLDATETIME')).\n" +
-// "?id tdp:performedBy ?system.\n" +
-// "} \n";
-//
-// String query2 = "PREFIX hb: <http://here/2010/tracked-data-provenance/heartbeat/ns#>\n" +
-// "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
-// "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
-// "SELECT * WHERE\n" +
-// "{\n" +
-// "?id hb:timeStamp ?timestamp. \n" +
-// "FILTER(mvmpart:timeRange(?id, hb:timeStamp, 1314360009522 , 1314367209522 , 'TIMESTAMP')).\n" +
-// "?id hb:count ?count.\n" +
-// "?id hb:systemName ?systemName.\n" +
-// "} ";
-
- System.out.println(query);
- System.out.println();
-// System.out.println(query2);
-
- SparqlPartitionStoreInputFormat.setSparqlQueries(job, query);
-// SparqlCloudbaseStoreInputFormat.setStartTime(job, 1309956861000l + "");
-// SparqlCloudbaseStoreInputFormat.setTtl(job, 86400000 + "");
-
- // set input output of the particular job
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- //job.setOutputFormatClass(FileOutputFormat.class);
-
-
- // set mapper and reducer classes
- job.setMapperClass(MyTempMapper.class);
- job.setReducerClass(Reducer.class);
- job.setNumReduceTasks(1);
-
- // set output
- Path outputDir = new Path(args[1]);
- FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
- if (dfs.exists(outputDir))
- dfs.delete(outputDir, true);
-
- FileOutputFormat.setOutputPath(job, outputDir);
-
- // Submit the job
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
- if (exitCode == 0) {
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
- return (int) job
- .getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS").getValue();
- } else {
- System.out.println("Job Failed!!!");
- }
-
- return -1;
- }
-
- public static class MyTempMapper extends Mapper<LongWritable, MapWritable, Text, Text> {
- Text outKey = new Text();
- Text outValue = new Text("partition");
- @Override
- protected void map(LongWritable key, MapWritable value, Context context) throws IOException, InterruptedException {
- outKey.set(value.values().toString());
- context.write(outKey, outValue);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
deleted file mode 100644
index 80255ba..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package mvm.mmrts.rdf.partition.mr;
-
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.Bytes;
-import mvm.mmrts.rdf.partition.PartitionConstants;
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-
-import java.io.IOException;
-import java.util.Date;
-
-/**
- * Class SparqlTestDriver
- * Date: Oct 28, 2010
- * Time: 2:53:39 PM
- */
-public class TestDriver implements Tool {
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new TestDriver(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private Configuration conf;
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public int run(String[] args) throws IOException, InterruptedException,
- ClassNotFoundException {
-
- Job job = new Job(conf);
- job.setJarByClass(TestDriver.class);
-
- FileInputFormat.addInputPaths(job, "/temp/rpunnoose/results.txt");
- job.setInputFormatClass(TextInputFormat.class);
-
- // set input output of the particular job
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(MapWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- job.setOutputFormatClass(TextOutputFormat.class);
-
- // set mapper and reducer classes
- job.setMapperClass(SubjectMapWrMapper.class);
- job.setReducerClass(OutMapWrReducer.class);
- job.setNumReduceTasks(1);
-// job.setNumReduceTasks(0);
-
- // set output
- Path outputDir = new Path("/temp/rpunnoose/partBS");
- FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
- if (dfs.exists(outputDir))
- dfs.delete(outputDir, true);
-
- FileOutputFormat.setOutputPath(job, outputDir);
-
- // Submit the job
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
- if (exitCode == 0) {
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
- return (int) job
- .getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS").getValue();
- } else {
- System.out.println("Job Failed!!!");
- }
-
- return -1;
- }
-
- public static class SubjectMapWrMapper extends Mapper<LongWritable, Text, Text, MapWritable> {
- Text outKey = new Text();
- final String ID = "id";
- final Text ID_TXT = new Text(ID);
- final String PERF_AT = "performedBy";
- final Text PERF_AT_TXT = new Text("system");
- final String REPORT_AT = "reportedAt";
- final Text REPORT_AT_TXT = new Text("timestamp");
- final String TYPE = "type";
- final Text TYPE_TXT = new Text(TYPE);
-
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String s = value.toString();
- int i = s.lastIndexOf("\0");
- Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(s.substring(0, i).getBytes()), PartitionConstants.VALUE_FACTORY);
- String predStr = stmt.getPredicate().stringValue();
- if (!predStr.contains(PERF_AT) && !predStr.contains(REPORT_AT) && !predStr.contains(TYPE))
- return;
-
- outKey.set(stmt.getSubject().stringValue());
- MapWritable mw = new MapWritable();
- mw.put(ID_TXT, outKey);
- if (predStr.contains(PERF_AT))
- mw.put(PERF_AT_TXT, new Text(stmt.getObject().stringValue()));
- else if (predStr.contains(REPORT_AT))
- mw.put(REPORT_AT_TXT, new Text(stmt.getObject().stringValue()));
- else if (predStr.contains(TYPE))
- mw.put(TYPE_TXT, new Text(stmt.getObject().stringValue()));
-
- context.write(outKey, mw);
- }
- }
-
- public static class OutMapWrReducer extends Reducer<Text, MapWritable, Text, Text> {
- final Text PART = new Text("partitionBS");
- Text outKey = new Text();
-
- @Override
- protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
- MapWritable mw = new MapWritable();
- for (MapWritable value : values) {
- mw.putAll(value);
- }
- outKey.set(mw.values().toString());
- context.write(outKey, PART);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
deleted file mode 100644
index 2b4565f..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
+++ /dev/null
@@ -1,229 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.compat;
-
-import cloudbase.core.CBConstants;
-import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.security.ColumnVisibility;
-import mvm.mmrts.rdf.partition.PartitionConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-/**
- * MMRTS-148 Need to move the shard index from the partition table to the shardIndex table
- * Class MoveShardIndexTool
- * Date: Dec 8, 2011
- * Time: 4:11:40 PM
- */
-public class ChangeShardDateFormatTool implements Tool {
- public static final String CB_USERNAME_PROP = "cb.username";
- public static final String CB_PWD_PROP = "cb.pwd";
- public static final String CB_ZK_PROP = "cb.zk";
- public static final String CB_INSTANCE_PROP = "cb.instance";
- public static final String PARTITION_TABLE_PROP = "partition.table";
- public static final String OLD_DATE_FORMAT_PROP = "date.format.old";
- public static final String NEW_DATE_FORMAT_PROP = "date.format.new";
- public static final String OLD_DATE_SHARD_DELIM = "date.shard.delim.old";
- public static final String NEW_DATE_SHARD_DELIM = "date.shard.delim.new";
-
-
- private Configuration conf;
-
- private String userName = "root";
- private String pwd = "password";
- private String instance = "stratus";
- private String zk = "10.40.190.113:2181";
- private String partitionTable = "rdfPartition";
- private String oldDateFormat = "yyyy-MM";
- private String newDateFormat = "yyyyMMdd";
- private String oldDateDelim = "-";
- private String newDateDelim = "_";
-
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new ChangeShardDateFormatTool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- runJob(args);
- return 0;
- }
-
- public long runJob(String[] args) throws Exception {
- //faster
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
- conf.set("io.sort.mb", "256");
-
- zk = conf.get(CB_ZK_PROP, zk);
- instance = conf.get(CB_INSTANCE_PROP, instance);
- userName = conf.get(CB_USERNAME_PROP, userName);
- pwd = conf.get(CB_PWD_PROP, pwd);
- partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable);
- oldDateFormat = conf.get(OLD_DATE_FORMAT_PROP, oldDateFormat);
- newDateFormat = conf.get(NEW_DATE_FORMAT_PROP, newDateFormat);
- oldDateDelim = conf.get(OLD_DATE_SHARD_DELIM, oldDateDelim);
- newDateDelim = conf.get(NEW_DATE_SHARD_DELIM, newDateDelim);
- conf.set(NEW_DATE_FORMAT_PROP, newDateFormat);
- conf.set(OLD_DATE_FORMAT_PROP, oldDateFormat);
- conf.set(PARTITION_TABLE_PROP, partitionTable);
- conf.set(OLD_DATE_SHARD_DELIM, oldDateDelim);
- conf.set(NEW_DATE_SHARD_DELIM, newDateDelim);
-
- Job job = new Job(conf);
- job.setJarByClass(ChangeShardDateFormatTool.class);
-
- job.setInputFormatClass(CloudbaseInputFormat.class);
- //TODO: How should I send in Auths?
- CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(),
- partitionTable, CBConstants.NO_AUTHS);
- CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
-
- job.setMapperClass(ChangeDateFormatMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Mutation.class);
-
- job.setOutputFormatClass(CloudbaseOutputFormat.class);
- CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, partitionTable);
- CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-
- job.setNumReduceTasks(0);
-
- // Submit the job
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
- if (exitCode == 0) {
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
- return job
- .getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS").getValue();
- } else {
- System.out.println("Job Failed!!!");
- }
-
- return -1;
- }
-
- public static class ChangeDateFormatMapper extends Mapper<Key, Value, Text, Mutation> {
- private SimpleDateFormat oldDateFormat_df;
- private SimpleDateFormat newDateFormat_df;
- private Text partTableTxt;
- private String newDateDelim;
- private String oldDateDelim;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- String oldDateFormat = context.getConfiguration().get(OLD_DATE_FORMAT_PROP);
- if (oldDateFormat == null)
- throw new IllegalArgumentException("Old Date Format property cannot be null");
-
- oldDateFormat_df = new SimpleDateFormat(oldDateFormat);
-
- String newDateFormat = context.getConfiguration().get(NEW_DATE_FORMAT_PROP);
- if (newDateFormat == null)
- throw new IllegalArgumentException("New Date Format property cannot be null");
-
- newDateFormat_df = new SimpleDateFormat(newDateFormat);
-
- String partTable = context.getConfiguration().get(PARTITION_TABLE_PROP);
- if (partTable == null)
- throw new IllegalArgumentException("Partition Table property cannot be null");
-
- partTableTxt = new Text(partTable);
-
- oldDateDelim = context.getConfiguration().get(OLD_DATE_SHARD_DELIM);
- if (oldDateDelim == null)
- throw new IllegalArgumentException("Old Date Shard Delimiter property cannot be null");
-
- newDateDelim = context.getConfiguration().get(NEW_DATE_SHARD_DELIM);
- if (newDateDelim == null)
- throw new IllegalArgumentException("New Date Shard Delimiter property cannot be null");
-
- }
-
- @Override
- protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
- try {
- String cf = key.getColumnFamily().toString();
- if ("event".equals(cf) || "index".equals(cf)) {
- String shard = key.getRow().toString();
- int shardIndex = shard.lastIndexOf(oldDateDelim);
- if (shardIndex == -1)
- return; //no shard?
- String date_s = shard.substring(0, shardIndex);
- String shardValue = shard.substring(shardIndex + 1, shard.length());
-
- Date date = oldDateFormat_df.parse(date_s);
- String newShard = newDateFormat_df.format(date) + newDateDelim + shardValue;
-
- Mutation mutation = new Mutation(new Text(newShard));
- mutation.put(key.getColumnFamily(), key.getColumnQualifier(),
- new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value);
- context.write(partTableTxt, mutation);
-
- //delete
- mutation = new Mutation(key.getRow());
- mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis());
-
- context.write(partTableTxt, mutation);
- } else {
- //shard index
- String shard = key.getColumnFamily().toString();
- int shardIndex = shard.lastIndexOf(oldDateDelim);
- if (shardIndex == -1)
- return; //no shard?
-
- String date_s = shard.substring(0, shardIndex);
- String shardValue = shard.substring(shardIndex + 1, shard.length());
-
- Date date = oldDateFormat_df.parse(date_s);
- String newShard = newDateFormat_df.format(date) + newDateDelim + shardValue;
-
- Mutation mutation = new Mutation(key.getRow());
- mutation.put(new Text(newShard), key.getColumnQualifier(),
- new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value);
-
- //delete
- mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis());
- context.write(partTableTxt, mutation);
- }
- } catch (ParseException pe) {
- //only do work for the rows that match the old date format
- //throw new IOException(pe);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
deleted file mode 100644
index ba2eece..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.compat;
-
-import cloudbase.core.CBConstants;
-import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.security.ColumnVisibility;
-import mvm.mmrts.rdf.partition.PartitionConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Date;
-
-/**
- * MMRTS-148 Need to move the shard index from the partition table to the shardIndex table
- * Class MoveShardIndexTool
- * Date: Dec 8, 2011
- * Time: 4:11:40 PM
- */
-public class MoveShardIndexTool implements Tool {
- public static final String CB_USERNAME_PROP = "cb.username";
- public static final String CB_PWD_PROP = "cb.pwd";
- public static final String CB_ZK_PROP = "cb.zk";
- public static final String CB_INSTANCE_PROP = "cb.instance";
- public static final String PARTITION_TABLE_PROP = "partition.table";
- public static final String SHARD_INDEX_TABLE_PROP = "shard.index.table";
- public static final String SHARD_INDEX_DELETE_PROP = "shard.index.delete";
-
-
- private Configuration conf;
-
- private String userName = "root";
- private String pwd = "password";
- private String instance = "stratus";
- private String zk = "10.40.190.113:2181";
- private String partitionTable = "rdfPartition";
- private String shardIndexTable = "rdfShardIndex";
-
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new MoveShardIndexTool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- runJob(args);
- return 0;
- }
-
- public long runJob(String[] args) throws Exception {
- //faster
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
- conf.set("io.sort.mb", "256");
-
- zk = conf.get(CB_ZK_PROP, zk);
- instance = conf.get(CB_INSTANCE_PROP, instance);
- userName = conf.get(CB_USERNAME_PROP, userName);
- pwd = conf.get(CB_PWD_PROP, pwd);
- partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable);
- shardIndexTable = conf.get(SHARD_INDEX_TABLE_PROP, shardIndexTable);
- conf.set(SHARD_INDEX_TABLE_PROP, shardIndexTable);
- conf.set(PARTITION_TABLE_PROP, partitionTable);
-
- Job job = new Job(conf);
- job.setJarByClass(MoveShardIndexTool.class);
-
- job.setInputFormatClass(CloudbaseInputFormat.class);
- //TODO: How should I send in Auths?
- CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(),
- partitionTable, CBConstants.NO_AUTHS);
- CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
- CloudbaseInputFormat.setRanges(job, Collections.singleton(
- new Range(
- new Text(PartitionConstants.URI_MARKER_STR),
- new Text(PartitionConstants.PLAIN_LITERAL_MARKER_STR))));
-
- job.setMapperClass(ShardKeyValueToMutationMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Mutation.class);
-
- job.setOutputFormatClass(CloudbaseOutputFormat.class);
- CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, shardIndexTable);
- CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-
- job.setNumReduceTasks(0);
-
- // Submit the job
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
- if (exitCode == 0) {
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
- return job
- .getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS").getValue();
- } else {
- System.out.println("Job Failed!!!");
- }
-
- return -1;
- }
-
- public static class ShardKeyValueToMutationMapper extends Mapper<Key, Value, Text, Mutation> {
- private Text shardTableTxt;
- private Text partTableTxt;
- protected boolean deletePrevShardIndex;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- String shardTable = context.getConfiguration().get(SHARD_INDEX_TABLE_PROP);
- if (shardTable == null)
- throw new IllegalArgumentException("Shard Table property cannot be null");
-
- shardTableTxt = new Text(shardTable);
-
- String partTable = context.getConfiguration().get(PARTITION_TABLE_PROP);
- if (partTable == null)
- throw new IllegalArgumentException("Partition Table property cannot be null");
-
- partTableTxt = new Text(partTable);
-
- deletePrevShardIndex = context.getConfiguration().getBoolean(SHARD_INDEX_DELETE_PROP, false);
- System.out.println("Deleting shard index from previous: " + deletePrevShardIndex + " Part: " + partTableTxt);
- }
-
- @Override
- protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
- Mutation mutation = new Mutation(key.getRow());
- mutation.put(key.getColumnFamily(), key.getColumnQualifier(),
- new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value);
-
- context.write(shardTableTxt, mutation);
-
- if (deletePrevShardIndex) {
- mutation = new Mutation(key.getRow());
- mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis());
-
- context.write(partTableTxt, mutation);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
deleted file mode 100644
index b347a56..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput;
-
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.*;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Be able to input multiple rdf formatted files. Convert from rdf format to statements.
- * Class RdfFileInputFormat
- * Date: May 16, 2011
- * Time: 2:11:24 PM
- */
-public class RdfFileInputFormat extends FileInputFormat<LongWritable, BytesWritable> {
-
- public static final String RDF_FILE_FORMAT = "mvm.mmrts.rdf.cloudbase.sail.mr.fileinput.rdfformat";
-
- @Override
- public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit inputSplit,
- TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
- return new RdfFileRecordReader();
- }
-
- private class RdfFileRecordReader extends RecordReader<LongWritable, BytesWritable> implements RDFHandler {
-
- boolean closed = false;
- long count = 0;
- BlockingQueue<BytesWritable> queue = new LinkedBlockingQueue<BytesWritable>();
- int total = 0;
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- FileSplit fileSplit = (FileSplit) inputSplit;
- Configuration conf = taskAttemptContext.getConfiguration();
- String rdfForm_s = conf.get(RDF_FILE_FORMAT, RDFFormat.RDFXML.getName());
- RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s);
-
- Path file = fileSplit.getPath();
- FileSystem fs = file.getFileSystem(conf);
- FSDataInputStream fileIn = fs.open(fileSplit.getPath());
-
- RDFParser rdfParser = Rio.createParser(rdfFormat);
- rdfParser.setRDFHandler(this);
- try {
- rdfParser.parse(fileIn, "");
- } catch (Exception e) {
- throw new IOException(e);
- }
- fileIn.close();
- total = queue.size();
- //TODO: Make this threaded so that you don't hold too many statements before sending them
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return queue.size() > 0;
- }
-
- @Override
- public LongWritable getCurrentKey() throws IOException, InterruptedException {
- return new LongWritable(count++);
- }
-
- @Override
- public BytesWritable getCurrentValue() throws IOException, InterruptedException {
- return queue.poll();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return ((float) (total - queue.size())) / ((float) total);
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- }
-
- @Override
- public void startRDF() throws RDFHandlerException {
- }
-
- @Override
- public void endRDF() throws RDFHandlerException {
- }
-
- @Override
- public void handleNamespace(String s, String s1) throws RDFHandlerException {
- }
-
- @Override
- public void handleStatement(Statement statement) throws RDFHandlerException {
- try {
- byte[] stmt_bytes = RdfIO.writeStatement(statement, true);
- queue.add(new BytesWritable(stmt_bytes));
- } catch (IOException e) {
- throw new RDFHandlerException(e);
- }
- }
-
- @Override
- public void handleComment(String s) throws RDFHandlerException {
- }
- }
-//
-// public static RDFParser createRdfParser(RDFFormat rdfFormat) {
-// if (RDFFormat.RDFXML.equals(rdfFormat)) {
-// return new RDFXMLParserFactory().getParser();
-// } else if (RDFFormat.N3.equals(rdfFormat)) {
-// return new N3ParserFactory().getParser();
-// } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) {
-// return new NTriplesParserFactory().getParser();
-// } else if (RDFFormat.TRIG.equals(rdfFormat)) {
-// return new TriGParserFactory().getParser();
-// } else if (RDFFormat.TRIX.equals(rdfFormat)) {
-// return new TriXParserFactory().getParser();
-// } else if (RDFFormat.TURTLE.equals(rdfFormat)) {
-// return new TurtleParserFactory().getParser();
-// }
-// throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat + "]");
-// }
-//
-// public static RDFWriter createRdfWriter(RDFFormat rdfFormat, OutputStream os) {
-// if (RDFFormat.RDFXML.equals(rdfFormat)) {
-// return new RDFXMLWriterFactory().getWriter(os);
-// } else if (RDFFormat.N3.equals(rdfFormat)) {
-// return new N3WriterFactory().getWriter(os);
-// } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) {
-// return new NTriplesWriterFactory().getWriter(os);
-// } else if (RDFFormat.TRIG.equals(rdfFormat)) {
-// return new TriGWriterFactory().getWriter(os);
-// } else if (RDFFormat.TRIX.equals(rdfFormat)) {
-// return new TriXWriterFactory().getWriter(os);
-// } else if (RDFFormat.TURTLE.equals(rdfFormat)) {
-// return new TurtleWriterFactory().getWriter(os);
-// }
-// throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat + "]");
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
deleted file mode 100644
index 12c1a4e..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
+++ /dev/null
@@ -1,210 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput;
-
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Mutation;
-import com.google.common.io.ByteStreams;
-import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Resource;
-import org.openrdf.model.Statement;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import java.io.IOException;
-import java.util.Date;
-
-import static mvm.mmrts.rdf.partition.PartitionConstants.*;
-import static mvm.mmrts.rdf.partition.PartitionConstants.EMPTY_VALUE;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputToCloudbaseTool
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputToCloudbaseTool implements Tool {
-
- public static final String CB_USERNAME_PROP = "cb.username";
- public static final String CB_PWD_PROP = "cb.pwd";
- public static final String CB_SERVER_PROP = "cb.server";
- public static final String CB_PORT_PROP = "cb.port";
- public static final String CB_INSTANCE_PROP = "cb.instance";
- public static final String CB_TTL_PROP = "cb.ttl";
- public static final String CB_TABLE_PROP = "cb.table";
-
-
- private Configuration conf;
-
- private String userName = "root";
- private String pwd = "password";
- private String instance = "stratus";
- private String server = "10.40.190.113";
- private String port = "2181";
- private String table = "partitionRdf";
-
-
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new RdfFileInputToCloudbaseTool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- //faster
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
- conf.set("io.sort.mb", "256");
-
- server = conf.get(CB_SERVER_PROP, server);
- port = conf.get(CB_PORT_PROP, port);
- instance = conf.get(CB_INSTANCE_PROP, instance);
- userName = conf.get(CB_USERNAME_PROP, userName);
- pwd = conf.get(CB_PWD_PROP, pwd);
- table = conf.get(CB_TABLE_PROP, table);
- conf.set(CB_TABLE_PROP, table);
-
- Job job = new Job(conf);
- job.setJarByClass(RdfFileInputToCloudbaseTool.class);
-
- // set up cloudbase input
- job.setInputFormatClass(RdfFileInputFormat.class);
- RdfFileInputFormat.addInputPath(job, new Path(args[0]));
-
- // set input output of the particular job
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(BytesWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Mutation.class);
-
- job.setOutputFormatClass(CloudbaseOutputFormat.class);
- CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, table);
- CloudbaseOutputFormat.setZooKeeperInstance(job, instance, server + ":" + port);
-
- // set mapper and reducer classes
- job.setMapperClass(OutSubjStmtMapper.class);
- job.setReducerClass(StatementToMutationReducer.class);
-
- // set output
-// Path outputDir = new Path("/temp/sparql-out/testout");
-// FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
-// if (dfs.exists(outputDir))
-// dfs.delete(outputDir, true);
-//
-// FileOutputFormat.setOutputPath(job, outputDir);
-
- // Submit the job
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
- if (exitCode == 0) {
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
- return job
- .getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS").getValue();
- } else {
- System.out.println("Job Failed!!!");
- }
-
- return -1;
- }
-
- @Override
- public int run(String[] args) throws Exception {
- runJob(args);
- return 0;
- }
-
- public static class OutSubjStmtMapper extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> {
-
- public OutSubjStmtMapper() {
- }
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- }
-
- @Override
- protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
- Statement statement = RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), ValueFactoryImpl.getInstance());
- context.write(new Text(new String(writeValue(statement.getSubject())) + FAMILY_DELIM_STR), value);
- }
-
- }
-
- public static class StatementToMutationReducer extends Reducer<Text, BytesWritable, Text, Mutation> {
- private Text outputTable;
- private DateHashModShardValueGenerator gen;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- outputTable = new Text(context.getConfiguration().get(CB_TABLE_PROP, null));
- gen = new DateHashModShardValueGenerator();
- }
-
- @Override
- protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
- Resource subject = (Resource) RdfIO.readValue(ByteStreams.newDataInput(key.getBytes()), ValueFactoryImpl.getInstance(), FAMILY_DELIM);
- byte[] subj_bytes = writeValue(subject);
- String shard = gen.generateShardValue(subject);
- Text shard_txt = new Text(shard);
-
- /**
- * Triple - >
- *- < subject ><shard >:
- *- < shard > event:<subject >\0 < predicate >\0 < object >\0
- *- < shard > index:<predicate >\1 < object >\0
- */
- Mutation m_subj = new Mutation(shard_txt);
- for (BytesWritable stmt_bytes : values) {
- Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(stmt_bytes.getBytes()), ValueFactoryImpl.getInstance());
- m_subj.put(DOC, new Text(writeStatement(stmt, true)), EMPTY_VALUE);
- m_subj.put(INDEX, new Text(writeStatement(stmt, false)), EMPTY_VALUE);
- }
-
- /**
- * TODO: Is this right?
- * If the subject does not have any authorizations specified, then anyone can access it.
- * But the true authorization check will happen at the predicate/object level, which means that
- * the set returned will only be what the person is authorized to see. The shard lookup table has to
- * have the lowest level authorization all the predicate/object authorizations; otherwise,
- * a user may not be able to see the correct document.
- */
- Mutation m_shard = new Mutation(new Text(subj_bytes));
- m_shard.put(shard_txt, EMPTY_TXT, EMPTY_VALUE);
-
- context.write(outputTable, m_subj);
- context.write(outputTable, m_shard);
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
deleted file mode 100644
index e677d12..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
+++ /dev/null
@@ -1,159 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput;
-
-import com.google.common.io.ByteStreams;
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import java.io.IOException;
-import java.util.Date;
-
-import static mvm.mmrts.rdf.partition.PartitionConstants.FAMILY_DELIM_STR;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputToCloudbaseTool
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputToFileTool implements Tool {
-
- private Configuration conf;
-
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public static void main(String[] args) {
- try {
- ToolRunner.run(new Configuration(), new RdfFileInputToFileTool(), args);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- if (args.length < 2)
- throw new IllegalArgumentException("Usage: RdfFileInputToFileTool <input directory> <output directory>");
-
- //faster
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
- conf.set("io.sort.mb", "256");
-
- Job job = new Job(conf);
- job.setJarByClass(RdfFileInputToFileTool.class);
-
- // set up cloudbase input
- job.setInputFormatClass(RdfFileInputFormat.class);
- RdfFileInputFormat.addInputPath(job, new Path(args[0]));
-
- // set input output of the particular job
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
-
-
- // set mapper and reducer classes
- job.setMapperClass(StmtToBytesMapper.class);
- job.setReducerClass(StmtBytesReducer.class);
-
- // set output
- job.setOutputFormatClass(TextOutputFormat.class);
- Path outputDir = new Path(args[1]);
- FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
- if (dfs.exists(outputDir))
- dfs.delete(outputDir, true);
-
- FileOutputFormat.setOutputPath(job, outputDir);
-
- // Submit the job
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
- if (exitCode == 0) {
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
- return job
- .getCounters()
- .findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS").getValue();
- } else {
- System.out.println("Job Failed!!!");
- }
-
- return -1;
- }
-
- @Override
- public int run(String[] args) throws Exception {
- runJob(args);
- return 0;
- }
-
- public static class StmtToBytesMapper extends Mapper<LongWritable, BytesWritable, Text, Text> {
-
- Text outKey = new Text();
- Text outValue = new Text();
-
- public StmtToBytesMapper() {
- }
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- }
-
- @Override
- protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
- Statement statement = RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), ValueFactoryImpl.getInstance());
- outKey.set(new String(writeValue(statement.getSubject())) + FAMILY_DELIM_STR);
- outValue.set(value.getBytes());
- context.write(outKey, outValue);
- }
-
- }
-
- public static class StmtBytesReducer extends Reducer<Text, Text, NullWritable, Text> {
-
- NullWritable outKey = NullWritable.get();
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- }
-
- @Override
- protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- for (Text stmt_txt : values) {
- context.write(outKey, stmt_txt);
- }
- }
- }
-}
-