You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by pu...@apache.org on 2015/12/04 17:46:36 UTC

[24/49] incubator-rya git commit: RYA-7 POM and License Clean-up for Apache Move

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/assembly/job.xml b/partition/mr.partition.rdf/src/main/assembly/job.xml
deleted file mode 100644
index 259b917..0000000
--- a/partition/mr.partition.rdf/src/main/assembly/job.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<assembly
-	xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
-	<id>job</id>
-	<formats>
-		<format>jar</format>
-	</formats>
-	<includeBaseDirectory>false</includeBaseDirectory>
-	<dependencySets>
-		<dependencySet>
-			<unpack>false</unpack>
-			<scope>runtime</scope>
-			<outputDirectory>lib</outputDirectory>
-			<excludes>
-				<exclude>org.apache.hadoop:hadoop-core</exclude>
-				<exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
-			</excludes>
-		</dependencySet>
-		<dependencySet>
-			<unpack>false</unpack>
-			<scope>system</scope>
-			<outputDirectory>lib</outputDirectory>
-			<excludes>
-				<exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
-			</excludes>
-		</dependencySet>
-	</dependencySets>
-	<fileSets>
-		<fileSet>
-			<directory>${basedir}/target/classes</directory>
-			<outputDirectory>/</outputDirectory>
-			<excludes>
-				<exclude>*.jar</exclude>
-			</excludes>
-		</fileSet>
-	</fileSets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy b/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy
deleted file mode 100644
index e5e02ec..0000000
--- a/partition/mr.partition.rdf/src/main/groovy/convertrdfdir.groovy
+++ /dev/null
@@ -1,33 +0,0 @@
-import org.openrdf.rio.rdfxml.*
-import org.openrdf.rio.ntriples.NTriplesWriterFactory
-import org.openrdf.rio.RDFHandler
-
-@Grab(group='com.google.guava', module='guava', version='r06')
-@Grab(group='org.openrdf.sesame', module='sesame-rio-rdfxml', version='2.3.2')
-@Grab(group='org.openrdf.sesame', module='sesame-rio-ntriples', version='2.3.2')
-@Grab(group='org.slf4j', module='slf4j-simple', version='1.5.8')
-def convertDirRdfFormat(def dir, def outputFile) {
-  //read each file
-  assert dir.isDirectory()
-
-  def ntriplesWriter = NTriplesWriterFactory.newInstance().getWriter(new FileOutputStream(outputFile))
-
-  ntriplesWriter.startRDF()
-  dir.listFiles().each { it ->
-    //load file into rdfxml parser
-    def rdfxmlParser = RDFXMLParserFactory.newInstance().getParser()
-    rdfxmlParser.setRDFHandler(
-        [       startRDF: {},
-                endRDF: {},
-                handleNamespace: { def prefix, def uri -> ntriplesWriter.handleNamespace(prefix, uri)},
-                handleComment: {},
-                handleStatement: { def stmt ->  ntriplesWriter.handleStatement stmt}] as RDFHandler
-    )
-    rdfxmlParser.parse(new FileInputStream(it), "")
-  }
-  ntriplesWriter.endRDF()
-}
-
-try{
-convertDirRdfFormat(new File(args[0]), new File(args[1]))
-}catch(Exception e) {e.printStackTrace();}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java
deleted file mode 100644
index e8b2e5a..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/MrTstBed.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package mvm.mmrts.rdf.partition.mr;
-
-import com.google.common.io.ByteStreams;
-import mvm.mmrts.rdf.partition.mr.transform.SparqlCloudbaseIFJob;
-
-import java.io.FileInputStream;
-
-/**
- * Class MrTstBed
- * Date: Sep 1, 2011
- * Time: 9:18:53 AM
- */
-public class MrTstBed {
-    public static void main(String[] args) {
-        try {
-//            String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" +
-//                    "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
-//                    "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
-//                    "SELECT * WHERE\n" +
-//                    "{\n" +
-//                    "?id tdp:reportedAt ?timestamp. \n" +
-//                    "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314898074000 , 1314898374000 , 'XMLDATETIME')).\n" +
-//                    "?id tdp:performedBy ?system.\n" +
-//                    "?id <http://here/2010/cmv/ns#hasMarkingText> \"U\".\n" +
-//                    "?id rdf:type tdp:Sent.\n" +
-//                    "} \n";
-
-            FileInputStream fis = new FileInputStream(args[0]);
-            String query = new String(ByteStreams.toByteArray(fis));
-            fis.close();
-
-//            String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" +
-//                    "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
-//                    "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
-//                    "SELECT * WHERE\n" +
-//                    "{\n" +
-//                    "?id tdp:reportedAt ?timestamp.\n" +
-//                    "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314381770000 , 1314381880000 , 'XMLDATETIME')).\n" +
-//                    "?id tdp:performedBy ?system.\n" +
-//                    "}";
-
-            new SparqlCloudbaseIFJob("partitionRdf", "root", "password", "stratus", "stratus13:2181", "/temp/queryout", MrTstBed.class, query).run();
-
-//            QueryParser parser = (new SPARQLParserFactory()).getParser();
-//            TupleExpr expr = parser.parseQuery(query, "http://www.w3.org/1999/02/22-rdf-syntax-ns#").getTupleExpr();
-//            System.out.println(expr);
-//
-//            final Configuration queryConf = new Configuration();
-//            expr.visit(new FilterTimeIndexVisitor(queryConf));
-//
-//            (new SubjectGroupingOptimizer(queryConf)).optimize(expr, null, null);
-//
-//            System.out.println(expr);
-//
-//            //make sure of only one shardlookup
-//            expr.visit(new QueryModelVisitorBase<RuntimeException>() {
-//                int count = 0;
-//
-//                @Override
-//                public void meetOther(QueryModelNode node) throws RuntimeException {
-//                    super.meetOther(node);
-//                    count++;
-//                    if (count > 1)
-//                        throw new IllegalArgumentException("Query can only have one subject-star lookup");
-//                }
-//            });
-//
-//            final Job job = new Job(queryConf);
-//            job.setJarByClass(MrTstBed.class);
-//
-//            expr.visit(new QueryModelVisitorBase<RuntimeException>() {
-//                @Override
-//                public void meetOther(QueryModelNode node) throws RuntimeException {
-//                    super.meetOther(node);
-//
-//                    //set up CloudbaseBatchScannerInputFormat here
-//                    if (node instanceof ShardSubjectLookup) {
-//                        System.out.println("Lookup: " + node);
-//                        try {
-//                            new SparqlCloudbaseIFTransformer((ShardSubjectLookup) node, queryConf, job, "partitionRdf",
-//                                    "root", "password", "stratus", "stratus13:2181");
-//                        } catch (QueryEvaluationException e) {
-//                            e.printStackTrace();
-//                        }
-//                    }
-//                }
-//            });
-//
-//            Path outputDir = new Path("/temp/sparql-out/testout");
-//            FileSystem dfs = FileSystem.get(outputDir.toUri(), queryConf);
-//            if (dfs.exists(outputDir))
-//                dfs.delete(outputDir, true);
-//
-//            FileOutputFormat.setOutputPath(job, outputDir);
-//
-//            // Submit the job
-//            Date startTime = new Date();
-//            System.out.println("Job started: " + startTime);
-//            job.waitForCompletion(true);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java
deleted file mode 100644
index 15c9c79..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlPartitionStoreInputFormat.java
+++ /dev/null
@@ -1,411 +0,0 @@
-package mvm.mmrts.rdf.partition.mr;
-
-import cloudbase.core.client.ZooKeeperInstance;
-import cloudbase.core.util.ArgumentChecker;
-import mvm.mmrts.rdf.partition.PartitionSail;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.*;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.*;
-import org.openrdf.repository.Repository;
-import org.openrdf.repository.RepositoryConnection;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.repository.sail.SailRepository;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static mvm.mmrts.rdf.partition.PartitionConstants.*;
-
-/**
- * Class SparqlPartitionStoreInputFormat
- * Date: Oct 28, 2010
- * Time: 11:48:17 AM
- */
-public class SparqlPartitionStoreInputFormat extends InputFormat<LongWritable, MapWritable> {
-
-    public static final String PREFIX = "mvm.mmrts.rdf.partition.mr.sparqlinputformat";
-    public static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
-    public static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
-    public static final String USERNAME = PREFIX + ".username";
-    public static final String PASSWORD = PREFIX + ".password";
-
-    public static final String INSTANCE_NAME = PREFIX + ".instanceName";
-    public static final String ZK = PREFIX + ".zk";
-
-    public static final String STARTTIME = PREFIX + ".starttime";
-    public static final String ENDTIME = PREFIX + ".endtime";
-    public static final String TABLE = PREFIX + ".table";
-    public static final String SHARD_TABLE = PREFIX + ".shardtable";
-    public static final String SPARQL_QUERIES_PROP = PREFIX + ".sparql";
-    public static final String MR_NUMTHREADS_PROP = PREFIX + ".numthreads";
-//    public static final String RANGE_PROP = PREFIX + ".range";
-//    public static final String NUM_RANGES_PROP = PREFIX + ".numranges";
-//    public static final String TABLE_PREFIX_PROP = PREFIX + ".tablePrefix";
-//    public static final String OFFSET_RANGE_PROP = PREFIX + ".offsetrange";
-
-//    public static final String INFER_PROP = PREFIX + ".infer";
-
-    private static final String UTF_8 = "UTF-8";
-
-    private static final ValueFactory vf = ValueFactoryImpl.getInstance();
-
-    static class SparqlInputSplit extends InputSplit implements Writable {
-
-        protected String sparql;
-        protected String startTime;
-        protected String endTime;
-        protected String table;
-//        private Long offset;
-//        private Long limit;
-
-        private SparqlInputSplit() {
-        }
-
-        private SparqlInputSplit(String sparql, String startTime, String endTime, String table) {
-            this.sparql = sparql;
-            this.startTime = startTime;
-            this.endTime = endTime;
-            this.table = table;
-//            this.offset = offset;
-//            this.limit = limit;
-        }
-
-        @Override
-        public long getLength() throws IOException, InterruptedException {
-            return 0;
-        }
-
-        @Override
-        public String[] getLocations() throws IOException, InterruptedException {
-            return new String[]{sparql};
-        }
-
-        @Override
-        public void write(DataOutput dataOutput) throws IOException {
-            boolean startTimeExists = startTime != null;
-            dataOutput.writeBoolean(startTimeExists);
-            if (startTimeExists)
-                dataOutput.writeUTF(startTime);
-
-            boolean endTimeExists = endTime != null;
-            dataOutput.writeBoolean(endTimeExists);
-            if (endTimeExists)
-                dataOutput.writeUTF(endTime);
-
-            dataOutput.writeUTF(table);
-            dataOutput.writeUTF(sparql);
-        }
-
-        @Override
-        public void readFields(DataInput dataInput) throws IOException {
-            if (dataInput.readBoolean())
-                this.startTime = dataInput.readUTF();
-            if (dataInput.readBoolean())
-                this.endTime = dataInput.readUTF();
-            this.table = dataInput.readUTF();
-            this.sparql = dataInput.readUTF();
-        }
-    }
-
-    /**
-     * Create a SparqlInputSplit for every sparql query.<br>
-     * Separate a single sparql query into numRanges of time ranges. For example,
-     * a numRange of 3, with range of 1 day (ms), and 1 query, will have 3 input splits
-     * with the same query, however the first range will go from now to a day before, the second
-     * will go from the day before to the day before that, the third will go from the two days
-     * ago to forever back.
-     * <br><br>
-     * If the numRanges is not set, or set to 1, the inputsplit can only focus on a certain startTime,
-     * ttl. If these are not set, then look at all time.
-     *
-     * @param job
-     * @return
-     * @throws java.io.IOException
-     * @throws InterruptedException
-     */
-    @Override
-    public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
-        validateOptions(job.getConfiguration());
-        final Collection<String> queries = getSparqlQueries(job.getConfiguration());
-        if (queries == null || queries.size() == 0)
-            throw new IOException("Queries cannot be null or empty");
-
-        String startTime_s = getStartTime(job.getConfiguration());
-        String endTime_s = getEndTime(job.getConfiguration());
-
-        List<InputSplit> splits = new ArrayList<InputSplit>();
-        for (String query : queries) {
-            splits.add(new SparqlInputSplit(query, startTime_s, endTime_s, getTable(job.getConfiguration())));
-        }
-        return splits;
-    }
-
-    @Override
-    public RecordReader<LongWritable, MapWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-            throws IOException, InterruptedException {
-        return new SparqlResultsRecordReader(taskAttemptContext.getConfiguration());
-    }
-
-    protected static String getUsername(Configuration conf) {
-        return conf.get(USERNAME);
-    }
-
-    /**
-     * WARNING: The password is stored in the Configuration and shared with all
-     * MapReduce tasks; It is BASE64 encoded to provide a charset safe
-     * conversion to a string, and is not intended to be secure.
-     */
-    protected static String getPassword(Configuration conf) {
-        return new String(Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()));
-    }
-
-    protected static String getInstance(Configuration conf) {
-        return conf.get(INSTANCE_NAME);
-    }
-
-    public static void setSparqlQueries(JobContext job, String... queries) {
-        if (queries == null || queries.length == 0)
-            throw new IllegalArgumentException("Queries cannot be null or empty");
-
-        final Configuration conf = job.getConfiguration();
-        setSparqlQueries(conf, queries);
-    }
-
-    public static void setSparqlQueries(Configuration conf, String... queries) {
-        try {
-            Collection<String> qencs = new ArrayList<String>();
-            for (String query : queries) {
-                final String qenc = URLEncoder.encode(query, UTF_8);
-                qencs.add(qenc);
-            }
-            conf.setStrings(SPARQL_QUERIES_PROP, qencs.toArray(new String[qencs.size()]));
-        } catch (UnsupportedEncodingException e) {
-            //what to do...
-            e.printStackTrace();
-        }
-    }
-
-    public static Collection<String> getSparqlQueries(Configuration conf) {
-        Collection<String> queries = new ArrayList<String>();
-        final Collection<String> qencs = conf.getStringCollection(SPARQL_QUERIES_PROP);
-        for (String qenc : qencs) {
-            queries.add(qenc);
-        }
-        return queries;
-    }
-
-    public static void setLongJob(JobContext job, Long time) {
-        Configuration conf = job.getConfiguration();
-        //need to make the runtime longer, default 30 min
-        time = (time == null) ? 1800000 : time;
-        conf.setLong("mapreduce.tasktracker.healthchecker.script.timeout", time);
-        conf.set("mapred.child.java.opts", "-Xmx1G");
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-    }
-
-    public static void setInputInfo(JobContext job, String user, byte[] passwd) {
-        Configuration conf = job.getConfiguration();
-        if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
-            throw new IllegalStateException("Input info can only be set once per job");
-        conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
-
-        ArgumentChecker.notNull(user, passwd);
-        conf.set(USERNAME, user);
-        conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    }
-
-    public static void setEndTime(JobContext job, String endTime) {
-        Configuration conf = job.getConfiguration();
-        conf.set(ENDTIME, endTime);
-    }
-
-    public static String getEndTime(Configuration conf) {
-        return conf.get(ENDTIME);
-    }
-
-    public static void setNumThreads(JobContext job, int numThreads) {
-        Configuration conf = job.getConfiguration();
-        conf.setInt(MR_NUMTHREADS_PROP, numThreads);
-    }
-
-    public static int getNumThreads(Configuration conf) {
-        return conf.getInt(MR_NUMTHREADS_PROP, -1);
-    }
-
-    public static void setTable(JobContext job, String table) {
-        Configuration conf = job.getConfiguration();
-        conf.set(TABLE, table);
-    }
-
-    public static String getTable(Configuration conf) {
-        return conf.get(TABLE);
-    }
-
-    public static void setShardTable(JobContext job, String table) {
-        Configuration conf = job.getConfiguration();
-        conf.set(SHARD_TABLE, table);
-    }
-
-    public static String getShardTable(Configuration conf) {
-        String t = conf.get(SHARD_TABLE);
-        return (t != null) ? t : getTable(conf);
-    }
-
-    public static void setStartTime(JobContext job, String startTime) {
-        Configuration conf = job.getConfiguration();
-        conf.set(STARTTIME, startTime);
-    }
-
-    public static String getStartTime(Configuration conf) {
-        return conf.get(STARTTIME);
-    }
-
-    public static void setZooKeeperInstance(JobContext job, String instanceName, String zk) {
-        Configuration conf = job.getConfiguration();
-        if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-            throw new IllegalStateException("Instance info can only be set once per job");
-        conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-
-        ArgumentChecker.notNull(instanceName, zk);
-        conf.set(INSTANCE_NAME, instanceName);
-        conf.set(ZK, zk);
-    }
-
-    protected static void validateOptions(Configuration conf) throws IOException {
-        if (!conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
-            throw new IOException("Input info has not been set.");
-        if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-            throw new IOException("Instance info has not been set.");
-        if (conf.getStrings(SPARQL_QUERIES_PROP) == null)
-            throw new IOException("Sparql queries have not been set.");
-    }
-
-    private class SparqlResultsRecordReader extends RecordReader<LongWritable, MapWritable>
-//            implements TupleQueryResultWriter, Runnable
-    {
-
-        boolean closed = false;
-        long count = 0;
-        BlockingQueue<MapWritable> queue = new LinkedBlockingQueue<MapWritable>();
-        private Repository repo;
-        String query;
-
-        Configuration conf;
-        private TupleQueryResult result;
-        private RepositoryConnection conn;
-
-        public SparqlResultsRecordReader(Configuration conf) {
-            this.conf = conf;
-        }
-
-        @Override
-        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-
-            try {
-                validateOptions(conf);
-
-                SparqlInputSplit sis = (SparqlInputSplit) inputSplit;
-                this.query = sis.sparql;
-
-                // init RdfCloudTripleStore
-                final PartitionSail store = new PartitionSail(new ZooKeeperInstance(getInstance(conf),
-                        conf.get(ZK)).getConnector(getUsername(conf), getPassword(conf).getBytes()), getTable(conf), getShardTable(conf));
-
-                repo = new SailRepository(store);
-                repo.initialize();
-
-                conn = repo.getConnection();
-                query = URLDecoder.decode(query, UTF_8);
-                TupleQuery tupleQuery = conn.prepareTupleQuery(
-                        QueryLanguage.SPARQL, query);
-
-                if (sis.startTime != null && sis.endTime != null) {
-                    tupleQuery.setBinding(START_BINDING, vf.createLiteral(sis.startTime));
-                    tupleQuery.setBinding(END_BINDING, vf.createLiteral(sis.endTime));
-                }
-
-                int threads = getNumThreads(conf);
-                if (threads > 0) {
-                    tupleQuery.setBinding(NUMTHREADS_PROP, vf.createLiteral(threads));
-                }
-
-                result = tupleQuery.evaluate();
-            } catch (Exception e) {
-                throw new IOException("Exception occurred opening Repository", e);
-            }
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            try {
-                return result.hasNext();
-            } catch (QueryEvaluationException e) {
-                throw new IOException(e);
-            }
-//            return false;
-        }
-
-        @Override
-        public LongWritable getCurrentKey() throws IOException, InterruptedException {
-            return new LongWritable(count++);
-        }
-
-        @Override
-        public MapWritable getCurrentValue() throws IOException, InterruptedException {
-            try {
-                if (result.hasNext()) {
-                    BindingSet bindingSet = result.next();
-                    return transformRow(bindingSet);
-                }
-                return null;
-            } catch (QueryEvaluationException e) {
-                throw new IOException(e);
-            }
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-            return (closed) ? (1) : (0);
-        }
-
-        @Override
-        public void close() throws IOException {
-            closed = true;
-            try {
-                conn.close();
-                repo.shutDown();
-            } catch (RepositoryException e) {
-                throw new IOException("Exception occurred closing Repository", e);
-            }
-        }
-
-        MapWritable mw = new MapWritable();
-
-        protected MapWritable transformRow(BindingSet bindingSet) {
-            mw.clear(); //handle the case of optional bindings. -mbraun
-            for (String name : bindingSet.getBindingNames()) {
-                final Text key = new Text(name);
-                final Text value = new Text(bindingSet.getValue(name).stringValue());
-                mw.put(key, value);
-            }
-            return mw;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
deleted file mode 100644
index 4b369ae..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/SparqlTestDriver.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package mvm.mmrts.rdf.partition.mr;
-
-import com.google.common.io.ByteStreams;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Date;
-
-/**
- * Class SparqlTestDriver
- * Date: Oct 28, 2010
- * Time: 2:53:39 PM
- */
-public class SparqlTestDriver implements Tool {
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new SparqlTestDriver(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    private Configuration conf;
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public int run(String[] args) throws IOException, InterruptedException,
-            ClassNotFoundException {
-
-        //query from file
-        if(args.length < 2) {
-            throw new IllegalArgumentException("Usage: hadoop jar mvm.mmrts.rdf.partition.mr.SparqlTestDriver <local query file> outputFile");
-        }
-
-        FileInputStream fis = new FileInputStream(args[0]);
-        String query = new String(ByteStreams.toByteArray(fis));
-        fis.close();
-
-        Job job = new Job(conf);
-        job.setJarByClass(SparqlTestDriver.class);
-
-        // set up cloudbase input
-        job.setInputFormatClass(SparqlPartitionStoreInputFormat.class);
-        SparqlPartitionStoreInputFormat.setInputInfo(job, "root", "password".getBytes());
-        SparqlPartitionStoreInputFormat.setZooKeeperInstance(job, "stratus", "10.40.190.113:2181");
-        SparqlPartitionStoreInputFormat.setLongJob(job, null);
-        SparqlPartitionStoreInputFormat.setTable(job, "partitionRdf");
-
-        long startTime_l = 1303811164088l;
-        long ttl = 86400000;
-
-        //set query
-//        String query = "PREFIX tdp: <http://here/2010/tracked-data-provenance/ns#>\n" +
-//                "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
-//                "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
-//                "SELECT * WHERE\n" +
-//                "{\n" +
-//                "?id tdp:reportedAt ?timestamp. \n" +
-//                "FILTER(mvmpart:timeRange(?id, tdp:reportedAt, 1314380456900 , 1314384056900 , 'XMLDATETIME')).\n" +
-//                "?id tdp:performedBy ?system.\n" +
-//                "} \n";
-//
-//        String query2 = "PREFIX hb: <http://here/2010/tracked-data-provenance/heartbeat/ns#>\n" +
-//                "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n" +
-//                "PREFIX mvmpart: <urn:mvm.mmrts.partition.rdf/08/2011#>\n" +
-//                "SELECT * WHERE\n" +
-//                "{\n" +
-//                "?id hb:timeStamp ?timestamp. \n" +
-//                "FILTER(mvmpart:timeRange(?id, hb:timeStamp, 1314360009522 , 1314367209522 , 'TIMESTAMP')).\n" +
-//                "?id hb:count ?count.\n" +
-//                "?id hb:systemName ?systemName.\n" +
-//                "} ";
-
-        System.out.println(query);
-        System.out.println();
-//        System.out.println(query2);
-
-        SparqlPartitionStoreInputFormat.setSparqlQueries(job, query);
-//        SparqlCloudbaseStoreInputFormat.setStartTime(job, 1309956861000l + "");
-//        SparqlCloudbaseStoreInputFormat.setTtl(job, 86400000 + "");
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Text.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Text.class);
-
-        //job.setOutputFormatClass(FileOutputFormat.class);
-
-
-        // set mapper and reducer classes
-        job.setMapperClass(MyTempMapper.class);
-        job.setReducerClass(Reducer.class);
-        job.setNumReduceTasks(1);
-
-        // set output
-        Path outputDir = new Path(args[1]);
-        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
-        if (dfs.exists(outputDir))
-            dfs.delete(outputDir, true);
-
-        FileOutputFormat.setOutputPath(job, outputDir);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return (int) job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    public static class MyTempMapper extends Mapper<LongWritable, MapWritable, Text, Text> {
-        Text outKey = new Text();
-        Text outValue = new Text("partition");
-        @Override
-        protected void map(LongWritable key, MapWritable value, Context context) throws IOException, InterruptedException {
-            outKey.set(value.values().toString());
-            context.write(outKey, outValue);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
deleted file mode 100644
index 80255ba..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/TestDriver.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package mvm.mmrts.rdf.partition.mr;
-
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.Bytes;
-import mvm.mmrts.rdf.partition.PartitionConstants;
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-
-import java.io.IOException;
-import java.util.Date;
-
-/**
- * Class SparqlTestDriver
- * Date: Oct 28, 2010
- * Time: 2:53:39 PM
- */
-public class TestDriver implements Tool {
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new TestDriver(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    private Configuration conf;
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public int run(String[] args) throws IOException, InterruptedException,
-            ClassNotFoundException {
-
-        Job job = new Job(conf);
-        job.setJarByClass(TestDriver.class);
-
-        FileInputFormat.addInputPaths(job, "/temp/rpunnoose/results.txt");
-        job.setInputFormatClass(TextInputFormat.class);
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(MapWritable.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Text.class);
-
-        job.setOutputFormatClass(TextOutputFormat.class);
-
-        // set mapper and reducer classes
-        job.setMapperClass(SubjectMapWrMapper.class);
-        job.setReducerClass(OutMapWrReducer.class);
-        job.setNumReduceTasks(1);
-//        job.setNumReduceTasks(0);
-
-        // set output
-        Path outputDir = new Path("/temp/rpunnoose/partBS");
-        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
-        if (dfs.exists(outputDir))
-            dfs.delete(outputDir, true);
-
-        FileOutputFormat.setOutputPath(job, outputDir);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return (int) job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    public static class SubjectMapWrMapper extends Mapper<LongWritable, Text, Text, MapWritable> {
-        Text outKey = new Text();
-        final String ID = "id";
-        final Text ID_TXT = new Text(ID);
-        final String PERF_AT = "performedBy";
-        final Text PERF_AT_TXT = new Text("system");
-        final String REPORT_AT = "reportedAt";
-        final Text REPORT_AT_TXT = new Text("timestamp");
-        final String TYPE = "type";
-        final Text TYPE_TXT = new Text(TYPE);
-
-        @Override
-        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-            String s = value.toString();
-            int i = s.lastIndexOf("\0");
-            Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(s.substring(0, i).getBytes()), PartitionConstants.VALUE_FACTORY);
-            String predStr = stmt.getPredicate().stringValue();
-            if (!predStr.contains(PERF_AT) && !predStr.contains(REPORT_AT) && !predStr.contains(TYPE))
-                return;
-
-            outKey.set(stmt.getSubject().stringValue());
-            MapWritable mw = new MapWritable();
-            mw.put(ID_TXT, outKey);
-            if (predStr.contains(PERF_AT))
-                mw.put(PERF_AT_TXT, new Text(stmt.getObject().stringValue()));
-            else if (predStr.contains(REPORT_AT))
-                mw.put(REPORT_AT_TXT, new Text(stmt.getObject().stringValue()));
-            else if (predStr.contains(TYPE))
-                mw.put(TYPE_TXT, new Text(stmt.getObject().stringValue()));
-
-            context.write(outKey, mw);
-        }
-    }
-
-    public static class OutMapWrReducer extends Reducer<Text, MapWritable, Text, Text> {
-        final Text PART = new Text("partitionBS");
-        Text outKey = new Text();
-
-        @Override
-        protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
-            MapWritable mw = new MapWritable();
-            for (MapWritable value : values) {
-                mw.putAll(value);
-            }
-            outKey.set(mw.values().toString());
-            context.write(outKey, PART);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
deleted file mode 100644
index 2b4565f..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/ChangeShardDateFormatTool.java
+++ /dev/null
@@ -1,229 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.compat;
-
-import cloudbase.core.CBConstants;
-import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.security.ColumnVisibility;
-import mvm.mmrts.rdf.partition.PartitionConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-/**
- * MMRTS-148 Need to move the shard index from the partition table to the shardIndex table
- * Class MoveShardIndexTool
- * Date: Dec 8, 2011
- * Time: 4:11:40 PM
- */
-public class ChangeShardDateFormatTool implements Tool {
-    public static final String CB_USERNAME_PROP = "cb.username";
-    public static final String CB_PWD_PROP = "cb.pwd";
-    public static final String CB_ZK_PROP = "cb.zk";
-    public static final String CB_INSTANCE_PROP = "cb.instance";
-    public static final String PARTITION_TABLE_PROP = "partition.table";
-    public static final String OLD_DATE_FORMAT_PROP = "date.format.old";
-    public static final String NEW_DATE_FORMAT_PROP = "date.format.new";
-    public static final String OLD_DATE_SHARD_DELIM = "date.shard.delim.old";
-    public static final String NEW_DATE_SHARD_DELIM = "date.shard.delim.new";
-
-
-    private Configuration conf;
-
-    private String userName = "root";
-    private String pwd = "password";
-    private String instance = "stratus";
-    private String zk = "10.40.190.113:2181";
-    private String partitionTable = "rdfPartition";
-    private String oldDateFormat = "yyyy-MM";
-    private String newDateFormat = "yyyyMMdd";
-    private String oldDateDelim = "-";
-    private String newDateDelim = "_";
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new ChangeShardDateFormatTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        runJob(args);
-        return 0;
-    }
-
-    public long runJob(String[] args) throws Exception {
-        //faster
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-        conf.set("io.sort.mb", "256");
-
-        zk = conf.get(CB_ZK_PROP, zk);
-        instance = conf.get(CB_INSTANCE_PROP, instance);
-        userName = conf.get(CB_USERNAME_PROP, userName);
-        pwd = conf.get(CB_PWD_PROP, pwd);
-        partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable);
-        oldDateFormat = conf.get(OLD_DATE_FORMAT_PROP, oldDateFormat);
-        newDateFormat = conf.get(NEW_DATE_FORMAT_PROP, newDateFormat);
-        oldDateDelim = conf.get(OLD_DATE_SHARD_DELIM, oldDateDelim);
-        newDateDelim = conf.get(NEW_DATE_SHARD_DELIM, newDateDelim);
-        conf.set(NEW_DATE_FORMAT_PROP, newDateFormat);
-        conf.set(OLD_DATE_FORMAT_PROP, oldDateFormat);
-        conf.set(PARTITION_TABLE_PROP, partitionTable);
-        conf.set(OLD_DATE_SHARD_DELIM, oldDateDelim);
-        conf.set(NEW_DATE_SHARD_DELIM, newDateDelim);
-
-        Job job = new Job(conf);
-        job.setJarByClass(ChangeShardDateFormatTool.class);
-
-        job.setInputFormatClass(CloudbaseInputFormat.class);
-        //TODO: How should I send in Auths?
-        CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(),
-                partitionTable, CBConstants.NO_AUTHS);
-        CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
-
-        job.setMapperClass(ChangeDateFormatMapper.class);
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Mutation.class);
-
-        job.setOutputFormatClass(CloudbaseOutputFormat.class);
-        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, partitionTable);
-        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-
-        job.setNumReduceTasks(0);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    public static class ChangeDateFormatMapper extends Mapper<Key, Value, Text, Mutation> {
-        private SimpleDateFormat oldDateFormat_df;
-        private SimpleDateFormat newDateFormat_df;
-        private Text partTableTxt;
-        private String newDateDelim;
-        private String oldDateDelim;
-
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            String oldDateFormat = context.getConfiguration().get(OLD_DATE_FORMAT_PROP);
-            if (oldDateFormat == null)
-                throw new IllegalArgumentException("Old Date Format property cannot be null");
-
-            oldDateFormat_df = new SimpleDateFormat(oldDateFormat);
-
-            String newDateFormat = context.getConfiguration().get(NEW_DATE_FORMAT_PROP);
-            if (newDateFormat == null)
-                throw new IllegalArgumentException("New Date Format property cannot be null");
-
-            newDateFormat_df = new SimpleDateFormat(newDateFormat);
-
-            String partTable = context.getConfiguration().get(PARTITION_TABLE_PROP);
-            if (partTable == null)
-                throw new IllegalArgumentException("Partition Table property cannot be null");
-
-            partTableTxt = new Text(partTable);
-
-            oldDateDelim = context.getConfiguration().get(OLD_DATE_SHARD_DELIM);
-            if (oldDateDelim == null)
-                throw new IllegalArgumentException("Old Date Shard Delimiter property cannot be null");
-
-            newDateDelim = context.getConfiguration().get(NEW_DATE_SHARD_DELIM);
-            if (newDateDelim == null)
-                throw new IllegalArgumentException("New Date Shard Delimiter property cannot be null");
-
-        }
-
-        @Override
-        protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
-            try {
-                String cf = key.getColumnFamily().toString();
-                if ("event".equals(cf) || "index".equals(cf)) {
-                    String shard = key.getRow().toString();
-                    int shardIndex = shard.lastIndexOf(oldDateDelim);
-                    if (shardIndex == -1)
-                        return; //no shard?
-                    String date_s = shard.substring(0, shardIndex);
-                    String shardValue = shard.substring(shardIndex + 1, shard.length());
-
-                    Date date = oldDateFormat_df.parse(date_s);
-                    String newShard = newDateFormat_df.format(date) + newDateDelim + shardValue;
-
-                    Mutation mutation = new Mutation(new Text(newShard));
-                    mutation.put(key.getColumnFamily(), key.getColumnQualifier(),
-                            new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value);
-                    context.write(partTableTxt, mutation);
-
-                    //delete
-                    mutation = new Mutation(key.getRow());
-                    mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis());
-
-                    context.write(partTableTxt, mutation);
-                } else {
-                    //shard index
-                    String shard = key.getColumnFamily().toString();
-                    int shardIndex = shard.lastIndexOf(oldDateDelim);
-                    if (shardIndex == -1)
-                        return; //no shard?
-
-                    String date_s = shard.substring(0, shardIndex);
-                    String shardValue = shard.substring(shardIndex + 1, shard.length());
-
-                    Date date = oldDateFormat_df.parse(date_s);
-                    String newShard = newDateFormat_df.format(date) + newDateDelim + shardValue;
-                    
-                    Mutation mutation = new Mutation(key.getRow());
-                    mutation.put(new Text(newShard), key.getColumnQualifier(),
-                            new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value);
-
-                    //delete
-                    mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis());
-                    context.write(partTableTxt, mutation);
-                }
-            } catch (ParseException pe) {
-                //only do work for the rows that match the old date format
-                //throw new IOException(pe);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
deleted file mode 100644
index ba2eece..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/compat/MoveShardIndexTool.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.compat;
-
-import cloudbase.core.CBConstants;
-import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.data.Range;
-import cloudbase.core.data.Value;
-import cloudbase.core.security.ColumnVisibility;
-import mvm.mmrts.rdf.partition.PartitionConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Date;
-
-/**
- * MMRTS-148 Need to move the shard index from the partition table to the shardIndex table
- * Class MoveShardIndexTool
- * Date: Dec 8, 2011
- * Time: 4:11:40 PM
- */
-public class MoveShardIndexTool implements Tool {
-    public static final String CB_USERNAME_PROP = "cb.username";
-    public static final String CB_PWD_PROP = "cb.pwd";
-    public static final String CB_ZK_PROP = "cb.zk";
-    public static final String CB_INSTANCE_PROP = "cb.instance";
-    public static final String PARTITION_TABLE_PROP = "partition.table";
-    public static final String SHARD_INDEX_TABLE_PROP = "shard.index.table";
-    public static final String SHARD_INDEX_DELETE_PROP = "shard.index.delete";
-
-
-    private Configuration conf;
-
-    private String userName = "root";
-    private String pwd = "password";
-    private String instance = "stratus";
-    private String zk = "10.40.190.113:2181";
-    private String partitionTable = "rdfPartition";
-    private String shardIndexTable = "rdfShardIndex";
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new MoveShardIndexTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        runJob(args);
-        return 0;
-    }
-
-    public long runJob(String[] args) throws Exception {
-        //faster
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-        conf.set("io.sort.mb", "256");
-
-        zk = conf.get(CB_ZK_PROP, zk);
-        instance = conf.get(CB_INSTANCE_PROP, instance);
-        userName = conf.get(CB_USERNAME_PROP, userName);
-        pwd = conf.get(CB_PWD_PROP, pwd);
-        partitionTable = conf.get(PARTITION_TABLE_PROP, partitionTable);
-        shardIndexTable = conf.get(SHARD_INDEX_TABLE_PROP, shardIndexTable);
-        conf.set(SHARD_INDEX_TABLE_PROP, shardIndexTable);
-        conf.set(PARTITION_TABLE_PROP, partitionTable);
-
-        Job job = new Job(conf);
-        job.setJarByClass(MoveShardIndexTool.class);
-
-        job.setInputFormatClass(CloudbaseInputFormat.class);
-        //TODO: How should I send in Auths?
-        CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(),
-                partitionTable, CBConstants.NO_AUTHS);
-        CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
-        CloudbaseInputFormat.setRanges(job, Collections.singleton(
-                new Range(
-                        new Text(PartitionConstants.URI_MARKER_STR),
-                        new Text(PartitionConstants.PLAIN_LITERAL_MARKER_STR))));
-
-        job.setMapperClass(ShardKeyValueToMutationMapper.class);
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Mutation.class);
-
-        job.setOutputFormatClass(CloudbaseOutputFormat.class);
-        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, shardIndexTable);
-        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-
-        job.setNumReduceTasks(0);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    public static class ShardKeyValueToMutationMapper extends Mapper<Key, Value, Text, Mutation> {
-        private Text shardTableTxt;
-        private Text partTableTxt;
-        protected boolean deletePrevShardIndex;
-
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            String shardTable = context.getConfiguration().get(SHARD_INDEX_TABLE_PROP);
-            if (shardTable == null)
-                throw new IllegalArgumentException("Shard Table property cannot be null");
-
-            shardTableTxt = new Text(shardTable);
-
-            String partTable = context.getConfiguration().get(PARTITION_TABLE_PROP);
-            if (partTable == null)
-                throw new IllegalArgumentException("Partition Table property cannot be null");
-
-            partTableTxt = new Text(partTable);
-
-            deletePrevShardIndex = context.getConfiguration().getBoolean(SHARD_INDEX_DELETE_PROP, false);
-            System.out.println("Deleting shard index from previous: " + deletePrevShardIndex + " Part: " + partTableTxt);
-        }
-
-        @Override
-        protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
-            Mutation mutation = new Mutation(key.getRow());
-            mutation.put(key.getColumnFamily(), key.getColumnQualifier(),
-                    new ColumnVisibility(key.getColumnVisibility()), System.currentTimeMillis(), value);
-
-            context.write(shardTableTxt, mutation);
-
-            if (deletePrevShardIndex) {
-                mutation = new Mutation(key.getRow());
-                mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), System.currentTimeMillis());
-
-                context.write(partTableTxt, mutation);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
deleted file mode 100644
index b347a56..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputFormat.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput;
-
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.*;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Be able to input multiple rdf formatted files. Convert from rdf format to statements.
- * Class RdfFileInputFormat
- * Date: May 16, 2011
- * Time: 2:11:24 PM
- */
-public class RdfFileInputFormat extends FileInputFormat<LongWritable, BytesWritable> {
-
-    public static final String RDF_FILE_FORMAT = "mvm.mmrts.rdf.cloudbase.sail.mr.fileinput.rdfformat";
-
-    @Override
-    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit inputSplit,
-                                                                            TaskAttemptContext taskAttemptContext)
-            throws IOException, InterruptedException {
-        return new RdfFileRecordReader();
-    }
-
-    private class RdfFileRecordReader extends RecordReader<LongWritable, BytesWritable> implements RDFHandler {
-
-        boolean closed = false;
-        long count = 0;
-        BlockingQueue<BytesWritable> queue = new LinkedBlockingQueue<BytesWritable>();
-        int total = 0;
-
-        @Override
-        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-            FileSplit fileSplit = (FileSplit) inputSplit;
-            Configuration conf = taskAttemptContext.getConfiguration();
-            String rdfForm_s = conf.get(RDF_FILE_FORMAT, RDFFormat.RDFXML.getName());
-            RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s);
-
-            Path file = fileSplit.getPath();
-            FileSystem fs = file.getFileSystem(conf);
-            FSDataInputStream fileIn = fs.open(fileSplit.getPath());
-
-            RDFParser rdfParser = Rio.createParser(rdfFormat);
-            rdfParser.setRDFHandler(this);
-            try {
-                rdfParser.parse(fileIn, "");
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-            fileIn.close();
-            total = queue.size();
-            //TODO: Make this threaded so that you don't hold too many statements before sending them
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            return queue.size() > 0;
-        }
-
-        @Override
-        public LongWritable getCurrentKey() throws IOException, InterruptedException {
-            return new LongWritable(count++);
-        }
-
-        @Override
-        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
-            return queue.poll();
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-            return ((float) (total - queue.size())) / ((float) total);
-        }
-
-        @Override
-        public void close() throws IOException {
-            closed = true;
-        }
-
-        @Override
-        public void startRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void endRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void handleNamespace(String s, String s1) throws RDFHandlerException {
-        }
-
-        @Override
-        public void handleStatement(Statement statement) throws RDFHandlerException {
-            try {
-                byte[] stmt_bytes = RdfIO.writeStatement(statement, true);
-                queue.add(new BytesWritable(stmt_bytes));
-            } catch (IOException e) {
-                throw new RDFHandlerException(e);
-            }
-        }
-
-        @Override
-        public void handleComment(String s) throws RDFHandlerException {
-        }
-    }
-//
-//    public static RDFParser createRdfParser(RDFFormat rdfFormat) {
-//        if (RDFFormat.RDFXML.equals(rdfFormat)) {
-//            return new RDFXMLParserFactory().getParser();
-//        } else if (RDFFormat.N3.equals(rdfFormat)) {
-//            return new N3ParserFactory().getParser();
-//        } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) {
-//            return new NTriplesParserFactory().getParser();
-//        } else if (RDFFormat.TRIG.equals(rdfFormat)) {
-//            return new TriGParserFactory().getParser();
-//        } else if (RDFFormat.TRIX.equals(rdfFormat)) {
-//            return new TriXParserFactory().getParser();
-//        } else if (RDFFormat.TURTLE.equals(rdfFormat)) {
-//            return new TurtleParserFactory().getParser();
-//        }
-//        throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat + "]");
-//    }
-//
-//    public static RDFWriter createRdfWriter(RDFFormat rdfFormat, OutputStream os) {
-//        if (RDFFormat.RDFXML.equals(rdfFormat)) {
-//            return new RDFXMLWriterFactory().getWriter(os);
-//        } else if (RDFFormat.N3.equals(rdfFormat)) {
-//            return new N3WriterFactory().getWriter(os);
-//        } else if (RDFFormat.NTRIPLES.equals(rdfFormat)) {
-//            return new NTriplesWriterFactory().getWriter(os);
-//        } else if (RDFFormat.TRIG.equals(rdfFormat)) {
-//            return new TriGWriterFactory().getWriter(os);
-//        } else if (RDFFormat.TRIX.equals(rdfFormat)) {
-//            return new TriXWriterFactory().getWriter(os);
-//        } else if (RDFFormat.TURTLE.equals(rdfFormat)) {
-//            return new TurtleWriterFactory().getWriter(os);
-//        }
-//        throw new IllegalArgumentException("Unknown RDFFormat[" + rdfFormat + "]");
-//    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
deleted file mode 100644
index 12c1a4e..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToCloudbaseTool.java
+++ /dev/null
@@ -1,210 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput;
-
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Mutation;
-import com.google.common.io.ByteStreams;
-import mvm.mmrts.rdf.partition.shard.DateHashModShardValueGenerator;
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Resource;
-import org.openrdf.model.Statement;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import java.io.IOException;
-import java.util.Date;
-
-import static mvm.mmrts.rdf.partition.PartitionConstants.*;
-import static mvm.mmrts.rdf.partition.PartitionConstants.EMPTY_VALUE;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeStatement;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputToCloudbaseTool
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputToCloudbaseTool implements Tool {
-
-    public static final String CB_USERNAME_PROP = "cb.username";
-    public static final String CB_PWD_PROP = "cb.pwd";
-    public static final String CB_SERVER_PROP = "cb.server";
-    public static final String CB_PORT_PROP = "cb.port";
-    public static final String CB_INSTANCE_PROP = "cb.instance";
-    public static final String CB_TTL_PROP = "cb.ttl";
-    public static final String CB_TABLE_PROP = "cb.table";
-
-
-    private Configuration conf;
-
-    private String userName = "root";
-    private String pwd = "password";
-    private String instance = "stratus";
-    private String server = "10.40.190.113";
-    private String port = "2181";
-    private String table = "partitionRdf";
-
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new RdfFileInputToCloudbaseTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-        //faster
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-        conf.set("io.sort.mb", "256");
-
-        server = conf.get(CB_SERVER_PROP, server);
-        port = conf.get(CB_PORT_PROP, port);
-        instance = conf.get(CB_INSTANCE_PROP, instance);
-        userName = conf.get(CB_USERNAME_PROP, userName);
-        pwd = conf.get(CB_PWD_PROP, pwd);
-        table = conf.get(CB_TABLE_PROP, table);
-        conf.set(CB_TABLE_PROP, table);
-
-        Job job = new Job(conf);
-        job.setJarByClass(RdfFileInputToCloudbaseTool.class);
-
-        // set up cloudbase input
-        job.setInputFormatClass(RdfFileInputFormat.class);
-        RdfFileInputFormat.addInputPath(job, new Path(args[0]));
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(BytesWritable.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Mutation.class);
-
-        job.setOutputFormatClass(CloudbaseOutputFormat.class);
-        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, table);
-        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, server + ":" + port);
-
-        // set mapper and reducer classes
-        job.setMapperClass(OutSubjStmtMapper.class);
-        job.setReducerClass(StatementToMutationReducer.class);
-
-        // set output
-//        Path outputDir = new Path("/temp/sparql-out/testout");
-//        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
-//        if (dfs.exists(outputDir))
-//            dfs.delete(outputDir, true);
-//
-//        FileOutputFormat.setOutputPath(job, outputDir);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        runJob(args);
-        return 0;
-    }
-
-    public static class OutSubjStmtMapper extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> {
-
-        public OutSubjStmtMapper() {
-        }
-
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-        }
-
-        @Override
-        protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
-            Statement statement = RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), ValueFactoryImpl.getInstance());
-            context.write(new Text(new String(writeValue(statement.getSubject())) + FAMILY_DELIM_STR), value);
-        }
-
-    }
-
-    public static class StatementToMutationReducer extends Reducer<Text, BytesWritable, Text, Mutation> {
-        private Text outputTable;
-        private DateHashModShardValueGenerator gen;
-
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            outputTable = new Text(context.getConfiguration().get(CB_TABLE_PROP, null));
-            gen = new DateHashModShardValueGenerator();
-        }
-
-        @Override
-        protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
-            Resource subject = (Resource) RdfIO.readValue(ByteStreams.newDataInput(key.getBytes()), ValueFactoryImpl.getInstance(), FAMILY_DELIM);
-            byte[] subj_bytes = writeValue(subject);
-            String shard = gen.generateShardValue(subject);
-            Text shard_txt = new Text(shard);
-
-            /**
-             * Triple - >
-             *- < subject ><shard >:
-             *- < shard > event:<subject >\0 < predicate >\0 < object >\0
-             *- < shard > index:<predicate >\1 < object >\0
-             */
-            Mutation m_subj = new Mutation(shard_txt);
-            for (BytesWritable stmt_bytes : values) {
-                Statement stmt = RdfIO.readStatement(ByteStreams.newDataInput(stmt_bytes.getBytes()), ValueFactoryImpl.getInstance());
-                m_subj.put(DOC, new Text(writeStatement(stmt, true)), EMPTY_VALUE);
-                m_subj.put(INDEX, new Text(writeStatement(stmt, false)), EMPTY_VALUE);
-            }
-
-            /**
-             * TODO: Is this right?
-             * If the subject does not have any authorizations specified, then anyone can access it.
-             * But the true authorization check will happen at the predicate/object level, which means that
-             * the set returned will only be what the person is authorized to see.  The shard lookup table has to
-             * have the lowest level authorization all the predicate/object authorizations; otherwise,
-             * a user may not be able to see the correct document.
-             */
-            Mutation m_shard = new Mutation(new Text(subj_bytes));
-            m_shard.put(shard_txt, EMPTY_TXT, EMPTY_VALUE);
-
-            context.write(outputTable, m_subj);
-            context.write(outputTable, m_shard);
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
----------------------------------------------------------------------
diff --git a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java b/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
deleted file mode 100644
index e677d12..0000000
--- a/partition/mr.partition.rdf/src/main/java/mvm/mmrts/rdf/partition/mr/fileinput/RdfFileInputToFileTool.java
+++ /dev/null
@@ -1,159 +0,0 @@
-package mvm.mmrts.rdf.partition.mr.fileinput;
-
-import com.google.common.io.ByteStreams;
-import mvm.mmrts.rdf.partition.utils.RdfIO;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import java.io.IOException;
-import java.util.Date;
-
-import static mvm.mmrts.rdf.partition.PartitionConstants.FAMILY_DELIM_STR;
-import static mvm.mmrts.rdf.partition.utils.RdfIO.writeValue;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputToCloudbaseTool
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputToFileTool implements Tool {
-
-    private Configuration conf;
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new RdfFileInputToFileTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-        if (args.length < 2)
-            throw new IllegalArgumentException("Usage: RdfFileInputToFileTool <input directory> <output directory>");
-
-        //faster
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-        conf.set("io.sort.mb", "256");
-
-        Job job = new Job(conf);
-        job.setJarByClass(RdfFileInputToFileTool.class);
-
-        // set up cloudbase input
-        job.setInputFormatClass(RdfFileInputFormat.class);
-        RdfFileInputFormat.addInputPath(job, new Path(args[0]));
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Text.class);
-        job.setOutputKeyClass(NullWritable.class);
-        job.setOutputValueClass(Text.class);
-
-
-        // set mapper and reducer classes
-        job.setMapperClass(StmtToBytesMapper.class);
-        job.setReducerClass(StmtBytesReducer.class);
-
-        // set output
-        job.setOutputFormatClass(TextOutputFormat.class);
-        Path outputDir = new Path(args[1]);
-        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
-        if (dfs.exists(outputDir))
-            dfs.delete(outputDir, true);
-
-        FileOutputFormat.setOutputPath(job, outputDir);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        runJob(args);
-        return 0;
-    }
-
-    public static class StmtToBytesMapper extends Mapper<LongWritable, BytesWritable, Text, Text> {
-
-        Text outKey = new Text();
-        Text outValue = new Text();
-
-        public StmtToBytesMapper() {
-        }
-
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-        }
-
-        @Override
-        protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
-            Statement statement = RdfIO.readStatement(ByteStreams.newDataInput(value.getBytes()), ValueFactoryImpl.getInstance());
-            outKey.set(new String(writeValue(statement.getSubject())) + FAMILY_DELIM_STR);
-            outValue.set(value.getBytes());
-            context.write(outKey, outValue);
-        }
-
-    }
-
-    public static class StmtBytesReducer extends Reducer<Text, Text, NullWritable, Text> {
-
-        NullWritable outKey = NullWritable.get();
-
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-        }
-
-        @Override
-        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-            for (Text stmt_txt : values) {
-                context.write(outKey, stmt_txt);
-            }
-        }
-    }
-}
-