You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by pu...@apache.org on 2015/12/04 17:46:56 UTC

[44/49] incubator-rya git commit: RYA-7 POM and License Clean-up for Apache Move

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
deleted file mode 100644
index c03b124..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/BulkNtripsInputTool.java
+++ /dev/null
@@ -1,318 +0,0 @@
-package mvm.rya.cloudbase.mr.fileinput;
-
-import cloudbase.core.client.Connector;
-import cloudbase.core.client.ZooKeeperInstance;
-import cloudbase.core.client.admin.TableOperations;
-import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat;
-import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Value;
-import cloudbase.core.util.TextUtil;
-import com.google.common.base.Preconditions;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolver;
-import mvm.rya.cloudbase.CloudbaseRdfConstants;
-import mvm.rya.cloudbase.mr.utils.MRUtils;
-import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.*;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static mvm.rya.cloudbase.CloudbaseRdfUtils.extractValue;
-import static mvm.rya.cloudbase.CloudbaseRdfUtils.from;
-
-/**
- * Take large ntrips files and use MapReduce and Cloudbase
- * Bulk ingest techniques to load into the table in our partition format.
- * <p/>
- * Input: NTrips file
- * Map:
- * - key : shard row - Text
- * - value : stmt in doc triple format - Text
- * Partitioner: RangePartitioner
- * Reduce:
- * - key : all the entries for each triple - Cloudbase Key
- * Class BulkNtripsInputTool
- * Date: Sep 13, 2011
- * Time: 10:00:17 AM
- */
-public class BulkNtripsInputTool extends Configured implements Tool {
-
-    public static final String WORKDIR_PROP = "bulk.n3.workdir";
-
-    private String userName = "root";
-    private String pwd = "password";
-    private String instance = "stratus";
-    private String zk = "10.40.190.129:2181";
-    private String ttl = null;
-    private String workDirBase = "/temp/bulkcb/work";
-    private String format = RDFFormat.NTRIPLES.getName();
-
-    @Override
-    public int run(final String[] args) throws Exception {
-        final Configuration conf = getConf();
-        try {
-            //conf
-            zk = conf.get(MRUtils.CB_ZK_PROP, zk);
-            ttl = conf.get(MRUtils.CB_TTL_PROP, ttl);
-            instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
-            userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
-            pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
-            workDirBase = conf.get(WORKDIR_PROP, workDirBase);
-            format = conf.get(MRUtils.FORMAT_PROP, format);
-            conf.set(MRUtils.FORMAT_PROP, format);
-            final String inputDir = args[0];
-
-            ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk);
-            Connector connector = zooKeeperInstance.getConnector(userName, pwd);
-            TableOperations tableOperations = connector.tableOperations();
-
-            String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-            if (tablePrefix != null)
-                RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-            String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
-                    tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
-                    tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX};
-            Collection<Job> jobs = new ArrayList<Job>();
-            for (final String tableName : tables) {
-                PrintStream out = null;
-                try {
-                    String workDir = workDirBase + "/" + tableName;
-                    System.out.println("Loading data into table[" + tableName + "]");
-
-                    Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]");
-                    job.setJarByClass(this.getClass());
-                    //setting long job
-                    Configuration jobConf = job.getConfiguration();
-                    jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
-                    jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-                    jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", "256"));
-                    jobConf.setBoolean("mapred.compress.map.output", true);
-//                    jobConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression
-
-                    job.setInputFormatClass(TextInputFormat.class);
-
-                    job.setMapperClass(ParseNtripsMapper.class);
-                    job.setMapOutputKeyClass(Key.class);
-                    job.setMapOutputValueClass(Value.class);
-
-                    job.setCombinerClass(OutStmtMutationsReducer.class);
-                    job.setReducerClass(OutStmtMutationsReducer.class);
-                    job.setOutputFormatClass(CloudbaseFileOutputFormat.class);
-                    CloudbaseFileOutputFormat.setZooKeeperInstance(job, instance, zk);
-
-                    jobConf.set(ParseNtripsMapper.TABLE_PROPERTY, tableName);
-
-                    TextInputFormat.setInputPaths(job, new Path(inputDir));
-
-                    FileSystem fs = FileSystem.get(conf);
-                    Path workPath = new Path(workDir);
-                    if (fs.exists(workPath))
-                        fs.delete(workPath, true);
-
-                    CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files"));
-
-                    out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
-
-                    if (!tableOperations.exists(tableName))
-                        tableOperations.create(tableName);
-                    Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE);
-                    for (Text split : splits)
-                        out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split))));
-
-                    job.setNumReduceTasks(splits.size() + 1);
-                    out.close();
-
-                    job.setPartitionerClass(KeyRangePartitioner.class);
-                    RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
-
-                    jobConf.set(WORKDIR_PROP, workDir);
-
-                    job.submit();
-                    jobs.add(job);
-
-                } catch (Exception re) {
-                    throw new RuntimeException(re);
-                } finally {
-                    if (out != null)
-                        out.close();
-                }
-            }
-
-            for (Job job : jobs) {
-                while (!job.isComplete()) {
-                    Thread.sleep(1000);
-                }
-            }
-
-            for (String tableName : tables) {
-                String workDir = workDirBase + "/" + tableName;
-                tableOperations.importDirectory(
-                        tableName,
-                        workDir + "/files",
-                        workDir + "/failures",
-                        20,
-                        4,
-                        false);
-            }
-
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        return 0;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * input: ntrips format triple
-     * <p/>
-     * output: key: shard row from generator
-     * value: stmt in serialized format (document format)
-     */
-    public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> {
-        public static final String TABLE_PROPERTY = "parsentripsmapper.table";
-
-        private RDFParser parser;
-        private String rdfFormat;
-        private String namedGraph;
-        private RyaContext ryaContext = RyaContext.getInstance();
-        private TripleRowResolver rowResolver = ryaContext.getTripleResolver();
-
-        @Override
-        protected void setup(final Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            final String table = conf.get(TABLE_PROPERTY);
-            Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job");
-
-            final String cv_s = conf.get(MRUtils.CB_CV_PROP);
-            final byte[] cv = cv_s == null ? null : cv_s.getBytes();
-            rdfFormat = conf.get(MRUtils.FORMAT_PROP);
-            checkNotNull(rdfFormat, "Rdf format cannot be null");
-
-            namedGraph = conf.get(MRUtils.NAMED_GRAPH_PROP);
-
-            parser = Rio.createParser(RDFFormat.valueOf(rdfFormat));
-    		parser.setParserConfig(new ParserConfig(true, true, true, RDFParser.DatatypeHandling.VERIFY));
-            parser.setRDFHandler(new RDFHandler() {
-
-                @Override
-                public void startRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void endRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleNamespace(String s, String s1) throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleStatement(Statement statement) throws RDFHandlerException {
-                    try {
-                        RyaStatement rs = RdfToRyaConversions.convertStatement(statement);
-                        if(rs.getColumnVisibility() == null) {
-                            rs.setColumnVisibility(cv);
-                        }
-
-                    	// Inject the specified context into the statement.
-                        if(namedGraph != null){
-                            rs.setContext(new RyaURI(namedGraph));
-                        } 
-
-                        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT,TripleRow> serialize = rowResolver.serialize(rs);
-
-                        if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) {
-                            TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) {
-                            TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) {
-                            TripleRow tripleRow = serialize.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else
-                            throw new IllegalArgumentException("Unrecognized table[" + table + "]");
-
-                    } catch (Exception e) {
-                        throw new RDFHandlerException(e);
-                    }
-                }
-
-                @Override
-                public void handleComment(String s) throws RDFHandlerException {
-
-                }
-            });
-        }
-
-        @Override
-        public void map(LongWritable key, Text value, Context output)
-                throws IOException, InterruptedException {
-            String rdf = value.toString();
-            try {
-                parser.parse(new StringReader(rdf), "");
-            } catch (RDFParseException e) {
-                System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]");
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new IOException("Exception occurred parsing triple[" + rdf + "]");
-            }
-        }
-    }
-
-    public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> {
-
-        public void reduce(Key key, Iterable<Value> values, Context output)
-                throws IOException, InterruptedException {
-            output.write(key, CloudbaseRdfConstants.EMPTY_VALUE);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
deleted file mode 100644
index 5aed4a2..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputByLineTool.java
+++ /dev/null
@@ -1,230 +0,0 @@
-package mvm.rya.cloudbase.mr.fileinput;
-
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.security.ColumnVisibility;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.cloudbase.CloudbaseRdfConstants;
-import mvm.rya.cloudbase.RyaTableMutationsFactory;
-import mvm.rya.cloudbase.mr.utils.MRUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.*;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputTool2
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputByLineTool implements Tool {
-
-    private Configuration conf = new Configuration();
-
-    private String userName = "root";
-    private String pwd = "password";
-    private String instance = "stratus";
-    private String zk = "10.40.190.113:2181";
-    private String tablePrefix = null;
-    private RDFFormat format = RDFFormat.NTRIPLES;
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new RdfFileInputByLineTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-        conf.set("io.sort.mb", "256");
-        conf.setLong("mapred.task.timeout", 600000000);
-
-        zk = conf.get(MRUtils.CB_ZK_PROP, zk);
-        instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
-        userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
-        pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
-        format = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.getName()));
-
-        String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
-
-        Job job = new Job(conf);
-        job.setJarByClass(RdfFileInputByLineTool.class);
-
-        // set up cloudbase input
-        job.setInputFormatClass(TextInputFormat.class);
-        FileInputFormat.addInputPath(job, new Path(args[0]));
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Mutation.class);
-//        job.setOutputKeyClass(LongWritable.class);
-//        job.setOutputValueClass(StatementWritable.class);
-
-        job.setOutputFormatClass(CloudbaseOutputFormat.class);
-        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-
-        // set mapper and reducer classes
-        job.setMapperClass(TextToMutationMapper.class);
-        job.setNumReduceTasks(0);
-//        job.setReducerClass(Reducer.class);
-
-        // set output
-//        Path outputDir = new Path("/temp/sparql-out/testout");
-//        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
-//        if (dfs.exists(outputDir))
-//            dfs.deleteMutation(outputDir, true);
-//
-//        FileOutputFormat.setOutputPath(job, outputDir);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        return (int) runJob(args);
-    }
-
-    public static class TextToMutationMapper extends Mapper<LongWritable, Text, Text, Mutation> {
-        protected RDFParser parser;
-        private String prefix;
-        private RDFFormat rdfFormat;
-        protected Text spo_table;
-        private Text po_table;
-        private Text osp_table;
-        private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression();
-
-        public TextToMutationMapper() {
-        }
-
-        @Override
-        protected void setup(final Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-            if (prefix != null) {
-                RdfCloudTripleStoreConstants.prefixTables(prefix);
-            }
-
-            spo_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-            po_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
-            osp_table = new Text(prefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
-
-            final String cv_s = conf.get(MRUtils.CB_CV_PROP);
-            if (cv_s != null)
-                cv = cv_s.getBytes();
-
-            rdfFormat = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, RDFFormat.NTRIPLES.toString()));
-            parser = Rio.createParser(rdfFormat);
-            final RyaTableMutationsFactory mut = new RyaTableMutationsFactory();
-
-            parser.setRDFHandler(new RDFHandler() {
-
-                @Override
-                public void startRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void endRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleNamespace(String s, String s1) throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleStatement(Statement statement) throws RDFHandlerException {
-                    try {
-                        RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement);
-                        if(ryaStatement.getColumnVisibility() == null) {
-                            ryaStatement.setColumnVisibility(cv);
-                        }
-                        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap =
-                                mut.serialize(ryaStatement);
-                        Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-                        Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
-                        Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-
-                        for (Mutation m : spo) {
-                            context.write(spo_table, m);
-                        }
-                        for (Mutation m : po) {
-                            context.write(po_table, m);
-                        }
-                        for (Mutation m : osp) {
-                            context.write(osp_table, m);
-                        }
-                    } catch (Exception e) {
-                        throw new RDFHandlerException(e);
-                    }
-                }
-
-                @Override
-                public void handleComment(String s) throws RDFHandlerException {
-
-                }
-            });
-        }
-
-        @Override
-        protected void map(LongWritable key, Text value, final Context context) throws IOException, InterruptedException {
-            try {
-                parser.parse(new StringReader(value.toString()), "");
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
-
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
deleted file mode 100644
index 54f9a13..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputFormat.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package mvm.rya.cloudbase.mr.fileinput;
-
-import mvm.rya.api.domain.utils.RyaStatementWritable;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.cloudbase.mr.utils.MRUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.*;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Be able to input multiple rdf formatted files. Convert from rdf format to statements.
- * Class RdfFileInputFormat
- * Date: May 16, 2011
- * Time: 2:11:24 PM
- */
-public class RdfFileInputFormat extends FileInputFormat<LongWritable, RyaStatementWritable> {
-
-    @Override
-    public RecordReader<LongWritable, RyaStatementWritable> createRecordReader(InputSplit inputSplit,
-                                                                               TaskAttemptContext taskAttemptContext)
-            throws IOException, InterruptedException {
-        return new RdfFileRecordReader();
-    }
-
-    private class RdfFileRecordReader extends RecordReader<LongWritable, RyaStatementWritable> implements RDFHandler {
-
-        boolean closed = false;
-        long count = 0;
-        BlockingQueue<RyaStatementWritable> queue = new LinkedBlockingQueue<RyaStatementWritable>();
-        int total = 0;
-
-        @Override
-        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-            FileSplit fileSplit = (FileSplit) inputSplit;
-            Configuration conf = taskAttemptContext.getConfiguration();
-            String rdfForm_s = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName()); //default to RDF/XML
-            RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s);
-
-            Path file = fileSplit.getPath();
-            FileSystem fs = file.getFileSystem(conf);
-            FSDataInputStream fileIn = fs.open(fileSplit.getPath());
-
-            RDFParser rdfParser = Rio.createParser(rdfFormat);
-            rdfParser.setRDFHandler(this);
-            try {
-                rdfParser.parse(fileIn, "");
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-            fileIn.close();
-            total = queue.size();
-            //TODO: Make this threaded so that you don't hold too many statements before sending them
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            return queue.size() > 0;
-        }
-
-        @Override
-        public LongWritable getCurrentKey() throws IOException, InterruptedException {
-            return new LongWritable(count++);
-        }
-
-        @Override
-        public RyaStatementWritable getCurrentValue() throws IOException, InterruptedException {
-            return queue.poll();
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-            return ((float) (total - queue.size())) / ((float) total);
-        }
-
-        @Override
-        public void close() throws IOException {
-            closed = true;
-        }
-
-        @Override
-        public void startRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void endRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void handleNamespace(String s, String s1) throws RDFHandlerException {
-        }
-
-        @Override
-        public void handleStatement(Statement statement) throws RDFHandlerException {
-            queue.add(new RyaStatementWritable(RdfToRyaConversions.convertStatement(statement)));
-        }
-
-        @Override
-        public void handleComment(String s) throws RDFHandlerException {
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
deleted file mode 100644
index f48cbae..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/RdfFileInputTool.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package mvm.rya.cloudbase.mr.fileinput;
-
-import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-import cloudbase.core.data.Mutation;
-import cloudbase.core.security.ColumnVisibility;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.utils.RyaStatementWritable;
-import mvm.rya.cloudbase.CloudbaseRdfConstants;
-import mvm.rya.cloudbase.RyaTableMutationsFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.rio.RDFFormat;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-import static mvm.rya.cloudbase.mr.utils.MRUtils.*;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputTool
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputTool implements Tool {
-
-    private Configuration conf;
-
-    private String userName = "root";
-    private String pwd = "password";
-    private String instance = "stratus";
-    private String zk = "10.40.190.113:2181";
-    private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-    private String format = RDFFormat.RDFXML.getName();
-
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new RdfFileInputTool(), args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public long runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
-        //faster
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-
-        zk = conf.get(CB_ZK_PROP, zk);
-        instance = conf.get(CB_INSTANCE_PROP, instance);
-        userName = conf.get(CB_USERNAME_PROP, userName);
-        pwd = conf.get(CB_PWD_PROP, pwd);
-
-        tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, tablePrefix);
-        format = conf.get(FORMAT_PROP, format);
-        conf.set(FORMAT_PROP, format);
-
-        Job job = new Job(conf);
-        job.setJarByClass(RdfFileInputTool.class);
-
-        // set up cloudbase input
-        job.setInputFormatClass(RdfFileInputFormat.class);
-        RdfFileInputFormat.addInputPath(job, new Path(args[0]));
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(LongWritable.class);
-        job.setMapOutputValueClass(RyaStatementWritable.class);
-//        job.setOutputKeyClass(LongWritable.class);
-//        job.setOutputValueClass(StatementWritable.class);
-
-        job.setOutputFormatClass(CloudbaseOutputFormat.class);
-        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-
-        // set mapper and reducer classes
-        job.setMapperClass(StatementToMutationMapper.class);
-        job.setNumReduceTasks(0);
-//        job.setReducerClass(Reducer.class);
-
-        // set output
-//        Path outputDir = new Path("/temp/sparql-out/testout");
-//        FileSystem dfs = FileSystem.get(outputDir.toUri(), conf);
-//        if (dfs.exists(outputDir))
-//            dfs.deleteMutation(outputDir, true);
-//
-//        FileOutputFormat.setOutputPath(job, outputDir);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        runJob(args);
-        return 0;
-    }
-
-    public static class StatementToMutationMapper extends Mapper<LongWritable, RyaStatementWritable, Text, Mutation> {
-        protected String tablePrefix;
-        protected Text spo_table;
-        protected Text po_table;
-        protected Text osp_table;
-        private byte[] cv = CloudbaseRdfConstants.EMPTY_CV.getExpression();
-        RyaTableMutationsFactory mut = new RyaTableMutationsFactory();
-
-        public StatementToMutationMapper() {
-        }
-
-        @Override
-        protected void setup(Context context) throws IOException, InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            tablePrefix = conf.get(TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
-            spo_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-            po_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
-            osp_table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
-
-            final String cv_s = conf.get(CB_CV_PROP);
-            if (cv_s != null)
-                cv = cv_s.getBytes();
-        }
-
-        @Override
-        protected void map(LongWritable key, RyaStatementWritable value, Context context) throws IOException, InterruptedException {
-            RyaStatement statement = value.getRyaStatement();
-            if (statement.getColumnVisibility() == null) {
-                statement.setColumnVisibility(cv);
-            }
-            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutationMap =
-                    mut.serialize(statement);
-            Collection<Mutation> spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-            Collection<Mutation> po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
-            Collection<Mutation> osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-
-            for (Mutation m : spo) {
-                context.write(spo_table, m);
-            }
-            for (Mutation m : po) {
-                context.write(po_table, m);
-            }
-            for (Mutation m : osp) {
-                context.write(osp_table, m);
-            }
-        }
-
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
deleted file mode 100644
index 5d7d971..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/fileinput/ShardedBulkNtripsInputTool.java
+++ /dev/null
@@ -1,314 +0,0 @@
-//package mvm.rya.cloudbase.mr.fileinput;
-//
-//import cloudbase.core.client.Connector;
-//import cloudbase.core.client.ZooKeeperInstance;
-//import cloudbase.core.client.admin.TableOperations;
-//import cloudbase.core.client.mapreduce.CloudbaseFileOutputFormat;
-//import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner;
-//import cloudbase.core.data.Key;
-//import cloudbase.core.data.Value;
-//import cloudbase.core.util.TextUtil;
-//import com.google.common.base.Preconditions;
-//import mvm.rya.api.RdfCloudTripleStoreConstants;
-//import mvm.rya.cloudbase.CloudbaseRdfConstants;
-//import mvm.rya.cloudbase.RyaTableKeyValues;
-//import mvm.rya.cloudbase.mr.utils.MRUtils;
-//import mvm.rya.cloudbase.utils.bulk.KeyRangePartitioner;
-//import mvm.rya.cloudbase.utils.shard.HashAlgorithm;
-//import mvm.rya.cloudbase.utils.shard.HashCodeHashAlgorithm;
-//import org.apache.commons.codec.binary.Base64;
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.conf.Configured;
-//import org.apache.hadoop.fs.FileSystem;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Job;
-//import org.apache.hadoop.mapreduce.Mapper;
-//import org.apache.hadoop.mapreduce.Reducer;
-//import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-//import org.apache.hadoop.util.Tool;
-//import org.apache.hadoop.util.ToolRunner;
-//import org.openrdf.model.Resource;
-//import org.openrdf.model.Statement;
-//import org.openrdf.rio.*;
-//
-//import java.io.BufferedOutputStream;
-//import java.io.IOException;
-//import java.io.PrintStream;
-//import java.io.StringReader;
-//import java.util.ArrayList;
-//import java.util.Collection;
-//import java.util.Map;
-//
-//import static com.google.common.base.Preconditions.checkNotNull;
-//
-///**
-//* Take large ntrips files and use MapReduce and Cloudbase
-//* Bulk ingest techniques to load into the table in our partition format.
-//* Uses a sharded scheme
-//* <p/>
-//* Input: NTrips file
-//* Map:
-//* - key : shard row - Text
-//* - value : stmt in doc triple format - Text
-//* Partitioner: RangePartitioner
-//* Reduce:
-//* - key : all the entries for each triple - Cloudbase Key
-//* Class BulkNtripsInputTool
-//* Date: Sep 13, 2011
-//* Time: 10:00:17 AM
-//*/
-//public class ShardedBulkNtripsInputTool extends Configured implements Tool {
-//
-//    public static final String WORKDIR_PROP = "bulk.n3.workdir";
-//    public static final String BULK_N3_NUMSHARD = "bulk.n3.numshard";
-//
-//    private String userName = "root";
-//    private String pwd = "password";
-//    private String instance = "stratus";
-//    private String zk = "10.40.190.129:2181";
-//    private String ttl = null;
-//    private String workDirBase = "/temp/bulkcb/work";
-//    private String format = RDFFormat.NTRIPLES.getName();
-//    private int numShards;
-//
-//    @Override
-//    public int run(final String[] args) throws Exception {
-//        final Configuration conf = getConf();
-//        try {
-//            //conf
-//            zk = conf.get(MRUtils.CB_ZK_PROP, zk);
-//            ttl = conf.get(MRUtils.CB_TTL_PROP, ttl);
-//            instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
-//            userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
-//            pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
-//            workDirBase = conf.get(WORKDIR_PROP, workDirBase);
-//            format = conf.get(MRUtils.FORMAT_PROP, format);
-//            String numShards_s = conf.get(BULK_N3_NUMSHARD);
-//            Preconditions.checkArgument(numShards_s != null);
-//            numShards = Integer.parseInt(numShards_s);
-//            conf.set(MRUtils.FORMAT_PROP, format);
-//            final String inputDir = args[0];
-//
-//            ZooKeeperInstance zooKeeperInstance = new ZooKeeperInstance(instance, zk);
-//            Connector connector = zooKeeperInstance.getConnector(userName, pwd);
-//            TableOperations tableOperations = connector.tableOperations();
-//
-//            String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-//            if (tablePrefix != null)
-//                RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-//            String[] tables = {tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
-//                    tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
-//                    tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX};
-//            Collection<Job> jobs = new ArrayList<Job>();
-//            for (final String table : tables) {
-//                for (int i = 0; i < numShards; i++) {
-//                    final String tableName = table + i;
-//                    PrintStream out = null;
-//                    try {
-//                        String workDir = workDirBase + "/" + tableName;
-//                        System.out.println("Loading data into table[" + tableName + "]");
-//
-//                        Job job = new Job(new Configuration(conf), "Bulk Ingest load data to Generic RDF Table[" + tableName + "]");
-//                        job.setJarByClass(this.getClass());
-//                        //setting long job
-//                        job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
-//                        job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
-//                        job.getConfiguration().set("io.sort.mb", "256");
-//                        job.getConfiguration().setBoolean("mapred.compress.map.output", true);
-//                        job.getConfiguration().set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression
-//
-//                        job.setInputFormatClass(TextInputFormat.class);
-//
-//                        job.setMapperClass(ShardedParseNtripsMapper.class);
-//                        job.setMapOutputKeyClass(Key.class);
-//                        job.setMapOutputValueClass(Value.class);
-//
-//                        job.setCombinerClass(OutStmtMutationsReducer.class);
-//                        job.setReducerClass(OutStmtMutationsReducer.class);
-//                        job.setOutputFormatClass(CloudbaseFileOutputFormat.class);
-//                        CloudbaseFileOutputFormat.setZooKeeperInstance(job, instance, zk);
-//
-//                        job.getConfiguration().set(ShardedParseNtripsMapper.TABLE_PROPERTY, tableName);
-//                        job.getConfiguration().set(ShardedParseNtripsMapper.SHARD_PROPERTY, i + "");
-//
-//                        TextInputFormat.setInputPaths(job, new Path(inputDir));
-//
-//                        FileSystem fs = FileSystem.get(conf);
-//                        Path workPath = new Path(workDir);
-//                        if (fs.exists(workPath))
-//                            fs.deleteMutation(workPath, true);
-//
-//                        CloudbaseFileOutputFormat.setOutputPath(job, new Path(workDir + "/files"));
-//
-//                        out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
-//
-//                        if (!tableOperations.exists(tableName))
-//                            tableOperations.create(tableName);
-//                        Collection<Text> splits = tableOperations.getSplits(tableName, Integer.MAX_VALUE);
-//                        for (Text split : splits)
-//                            out.println(new String(Base64.encodeBase64(TextUtil.getBytes(split))));
-//
-//                        job.setNumReduceTasks(splits.size() + 1);
-//                        out.close();
-//
-//                        job.setPartitionerClass(KeyRangePartitioner.class);
-//                        RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
-//
-//                        job.getConfiguration().set(WORKDIR_PROP, workDir);
-//
-//                        job.submit();
-//                        jobs.add(job);
-//
-//                    } catch (Exception re) {
-//                        throw new RuntimeException(re);
-//                    } finally {
-//                        if (out != null)
-//                            out.close();
-//                    }
-//                }
-//            }
-//
-//            for (Job job : jobs) {
-//                while (!job.isComplete()) {
-//                    Thread.sleep(1000);
-//                }
-//            }
-//
-//            for (String table : tables) {
-//                for (int i = 0; i < numShards; i++) {
-//                    final String tableName = table + i;
-//                    String workDir = workDirBase + "/" + tableName;
-//                    tableOperations.importDirectory(
-//                            tableName,
-//                            workDir + "/files",
-//                            workDir + "/failures",
-//                            20,
-//                            4,
-//                            false);
-//                }
-//            }
-//
-//        } catch (Exception e) {
-//            throw new RuntimeException(e);
-//        }
-//
-//        return 0;
-//    }
-//
-//    public static void main(String[] args) {
-//        try {
-//            ToolRunner.run(new Configuration(), new ShardedBulkNtripsInputTool(), args);
-//        } catch (Exception e) {
-//            e.printStackTrace();
-//        }
-//    }
-//
-//    /**
-//     * input: ntrips format triple
-//     * <p/>
-//     * output: key: shard row from generator
-//     * value: stmt in serialized format (document format)
-//     */
-//    public static class ShardedParseNtripsMapper extends Mapper<LongWritable, Text, Key, Value> {
-//        public static final String TABLE_PROPERTY = "shardedparsentripsmapper.table";
-//        public static final String SHARD_PROPERTY = "shardedparsentripsmapper.shard";
-//
-//        private RDFParser parser;
-//        private String rdfFormat;
-//        private HashAlgorithm hashAlgorithm = new HashCodeHashAlgorithm();
-//        private int shard;
-//        private int numShards;
-//
-//        @Override
-//        protected void setup(final Context context) throws IOException, InterruptedException {
-//            super.setup(context);
-//            Configuration conf = context.getConfiguration();
-//            final String table = conf.get(TABLE_PROPERTY);
-//            Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " property on the map reduce job");
-//
-//            String shard_s = conf.get(SHARD_PROPERTY);
-//            Preconditions.checkNotNull(shard_s, "Set the " + SHARD_PROPERTY + " property");
-//            shard = Integer.parseInt(shard_s);
-//
-//            numShards = Integer.parseInt(conf.get(BULK_N3_NUMSHARD));
-//
-//            final String cv_s = conf.get(MRUtils.CB_CV_PROP);
-//            rdfFormat = conf.get(MRUtils.FORMAT_PROP);
-//            checkNotNull(rdfFormat, "Rdf format cannot be null");
-//
-//            parser = Rio.createParser(RDFFormat.valueOf(rdfFormat));
-//            parser.setRDFHandler(new RDFHandler() {
-//
-//                @Override
-//                public void startRDF() throws RDFHandlerException {
-//
-//                }
-//
-//                @Override
-//                public void endRDF() throws RDFHandlerException {
-//
-//                }
-//
-//                @Override
-//                public void handleNamespace(String s, String s1) throws RDFHandlerException {
-//
-//                }
-//
-//                @Override
-//                public void handleStatement(Statement statement) throws RDFHandlerException {
-//                    try {
-//                        Resource subject = statement.getSubject();
-//                        if ((hashAlgorithm.hash(subject.stringValue()) % numShards) != shard) {
-//                            return;
-//                        }
-//                        RyaTableKeyValues rdfTableKeyValues = new RyaTableKeyValues(subject, statement.getPredicate(), statement.getObject(), cv_s, statement.getContext()).invoke();
-//                        Collection<Map.Entry<Key, Value>> entries = null;
-//                        if (table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) {
-//                            entries = rdfTableKeyValues.getSpo();
-//                        } else if (table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) {
-//                            entries = rdfTableKeyValues.getPo();
-//                        } else if (table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) {
-//                            entries = rdfTableKeyValues.getOsp();
-//                        } else
-//                            throw new IllegalArgumentException("Unrecognized table[" + table + "]");
-//
-//                        for (Map.Entry<Key, Value> entry : entries) {
-//                            context.write(entry.getKey(), entry.getValue());
-//                        }
-//                    } catch (Exception e) {
-//                        throw new RDFHandlerException(e);
-//                    }
-//                }
-//
-//                @Override
-//                public void handleComment(String s) throws RDFHandlerException {
-//
-//                }
-//            });
-//        }
-//
-//        @Override
-//        public void map(LongWritable key, Text value, Context output)
-//                throws IOException, InterruptedException {
-//            String rdf = value.toString();
-//            try {
-//                parser.parse(new StringReader(rdf), "");
-//            } catch (RDFParseException e) {
-//                System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]");
-//            } catch (Exception e) {
-//                e.printStackTrace();
-//                throw new IOException("Exception occurred parsing triple[" + rdf + "]");
-//            }
-//        }
-//    }
-//
-//    public static class OutStmtMutationsReducer extends Reducer<Key, Value, Key, Value> {
-//
-//        public void reduce(Key key, Iterable<Value> values, Context output)
-//                throws IOException, InterruptedException {
-//            output.write(key, CloudbaseRdfConstants.EMPTY_VALUE);
-//        }
-//    }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
deleted file mode 100644
index 453d6ca..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/upgrade/UpgradeCloudbaseRdfTables.java
+++ /dev/null
@@ -1,350 +0,0 @@
-//package mvm.rya.cloudbase.mr.upgrade;
-//
-//import cloudbase.core.client.Connector;
-//import cloudbase.core.client.ZooKeeperInstance;
-//import cloudbase.core.client.admin.TableOperations;
-//import cloudbase.core.client.mapreduce.CloudbaseInputFormat;
-//import cloudbase.core.client.mapreduce.CloudbaseOutputFormat;
-//import cloudbase.core.data.Key;
-//import cloudbase.core.data.Mutation;
-//import cloudbase.core.data.Range;
-//import cloudbase.core.data.Value;
-//import cloudbase.core.security.Authorizations;
-//import cloudbase.core.security.ColumnVisibility;
-//import cloudbase.core.util.Pair;
-//import com.google.common.collect.Lists;
-//import com.google.common.io.ByteArrayDataInput;
-//import com.google.common.io.ByteArrayDataOutput;
-//import com.google.common.io.ByteStreams;
-//import mvm.rya.api.InvalidValueTypeMarkerRuntimeException;
-//import mvm.rya.api.RdfCloudTripleStoreConstants;
-//import mvm.rya.cloudbase.CloudbaseRdfConfiguration;
-//import mvm.rya.cloudbase.CloudbaseRdfConstants;
-//import mvm.rya.cloudbase.CloudbaseRyaDAO;
-//import mvm.rya.cloudbase.RyaTableMutationsFactory;
-//import mvm.rya.cloudbase.mr.utils.MRUtils;
-//import org.apache.hadoop.conf.Configuration;
-//import org.apache.hadoop.conf.Configured;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Job;
-//import org.apache.hadoop.mapreduce.Mapper;
-//import org.apache.hadoop.util.Tool;
-//import org.apache.hadoop.util.ToolRunner;
-//import org.openrdf.model.*;
-//import org.openrdf.model.impl.StatementImpl;
-//import org.openrdf.model.impl.ValueFactoryImpl;
-//
-//import java.io.IOException;
-//import java.util.ArrayList;
-//import java.util.Collection;
-//import java.util.Date;
-//import java.util.Map;
-//
-//import static mvm.rya.api.RdfCloudTripleStoreUtils.*;
-//
-///**
-// * 1. Check version. <br/>
-// * 2. If version does not exist, apply: <br/>
-// * - DELIM => 1 -> 0
-// * - DELIM_STOP => 2 -> 1
-// * - 3 table index
-// */
-//public class UpgradeCloudbaseRdfTables extends Configured implements Tool {
-//    public static final String TMP = "_tmp";
-//    public static final String DELETE_PROP = "rdf.upgrade.deleteMutation"; //true if ok to deleteMutation old tables
-//    private String zk = "10.40.190.113:2181";
-//    private String instance = "stratus";
-//    private String userName = "root";
-//    private String pwd = "password";
-//    private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-//    private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration();
-//
-//    @Override
-//    public int run(String[] strings) throws Exception {
-//        conf = new CloudbaseRdfConfiguration(getConf());
-//        //faster
-//        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-//        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-//        conf.set(MRUtils.JOB_NAME_PROP, "Upgrading Cloudbase Rdf Tables");
-//
-//        zk = conf.get(MRUtils.CB_ZK_PROP, zk);
-//        instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance);
-//        userName = conf.get(MRUtils.CB_USERNAME_PROP, userName);
-//        pwd = conf.get(MRUtils.CB_PWD_PROP, pwd);
-//
-//        tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
-//
-//        Authorizations authorizations = CloudbaseRdfConstants.ALL_AUTHORIZATIONS;
-//        String auth = conf.get(MRUtils.CB_AUTH_PROP);
-//        if (auth != null)
-//            authorizations = new Authorizations(auth.split(","));
-//
-//        boolean deleteTables = conf.getBoolean(DELETE_PROP, false);
-//
-//        //tables
-//        String spo = tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX;
-//        String po = tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX;
-//        String osp = tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX;
-//        String so = tablePrefix + "so";
-//        String ops = tablePrefix + "o";
-//
-//        //check version first
-//        Connector connector = new ZooKeeperInstance(instance, zk).getConnector(userName, pwd.getBytes());
-//        CloudbaseRyaDAO rdfDAO = new CloudbaseRyaDAO();
-//        rdfDAO.setConnector(connector);
-//        conf.setTablePrefix(tablePrefix);
-//        rdfDAO.setConf(conf);
-////        rdfDAO.setSpoTable(spo);
-////        rdfDAO.setPoTable(po);
-////        rdfDAO.setOspTable(osp);
-////        rdfDAO.setNamespaceTable(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
-//        rdfDAO.init();
-//        String version = rdfDAO.getVersion();
-//        if (version != null) {
-//            //TODO: Do a version check here
-//            //version found, no need to upgrade
-//            return 0;
-//        }
-//
-//        rdfDAO.destroy();
-//
-//        //create osp table, deleteMutation so and o tables
-//        TableOperations tableOperations = connector.tableOperations();
-//        if (deleteTables) {
-//            if (tableOperations.exists(so)) {
-//                tableOperations.deleteMutation(so);
-//            }
-//            if (tableOperations.exists(ops)) {
-//                tableOperations.deleteMutation(ops);
-//            }
-//        }
-//
-//        conf.set("io.sort.mb", "256");
-//        Job job = new Job(conf);
-//        job.setJarByClass(UpgradeCloudbaseRdfTables.class);
-//
-//        //set up cloudbase input
-//        job.setInputFormatClass(CloudbaseInputFormat.class);
-//        CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), spo, authorizations);
-//        CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk);
-//        Collection<Pair<Text, Text>> columns = new ArrayList<Pair<Text, Text>>();
-//        final Pair pair = new Pair(RdfCloudTripleStoreConstants.INFO_TXT, RdfCloudTripleStoreConstants.INFO_TXT);
-//        columns.add(pair);
-//        CloudbaseInputFormat.fetchColumns(job, columns);
-//
-//        CloudbaseInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE}))));
-//
-//        // set input output of the particular job
-//        job.setMapOutputKeyClass(Text.class);
-//        job.setMapOutputValueClass(Mutation.class);
-//
-//        //no reducer needed?
-//        job.setNumReduceTasks(0);
-//        job.setMapperClass(UpgradeCloudbaseRdfTablesMapper.class);
-//
-//        CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, spo + TMP);
-//        CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk);
-//        job.setOutputFormatClass(CloudbaseOutputFormat.class);
-//
-//        // Submit the job
-//        Date startTime = new Date();
-//        System.out.println("Job started: " + startTime);
-//        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-//
-//        if (exitCode == 0) {
-//            Date end_time = new Date();
-//            System.out.println("Job ended: " + end_time);
-//            System.out.println("The job took "
-//                    + (end_time.getTime() - startTime.getTime()) / 1000
-//                    + " seconds.");
-//
-//            //now deleteMutation old spo table, and rename tmp one
-//            if (deleteTables) {
-//                tableOperations.deleteMutation(spo);
-//                tableOperations.rename(spo + TMP, spo);
-//                tableOperations.deleteMutation(po);
-//                tableOperations.rename(po + TMP, po);
-//                tableOperations.deleteMutation(osp);
-//                tableOperations.rename(osp + TMP, osp);
-//            }
-//
-//            return 0;
-//        } else {
-//            System.out.println("Job Failed!!!");
-//        }
-//
-//        return -1;
-//    }
-//
-//    public static void main(String[] args) {
-//        try {
-//            ToolRunner.run(new Configuration(), new UpgradeCloudbaseRdfTables(), args);
-//        } catch (Exception e) {
-//            e.printStackTrace();
-//        }
-//    }
-//
-//    public static class UpgradeCloudbaseRdfTablesMapper extends Mapper<Key, Value, Text, Mutation> {
-//        private String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-//        ValueFactoryImpl vf = new ValueFactoryImpl();
-//
-//        private Text spo_table, po_table, osp_table;
-//
-//        RyaTableMutationsFactory mut = new RyaTableMutationsFactory();
-//
-//        @Override
-//        protected void setup(Context context) throws IOException, InterruptedException {
-//            super.setup(context);
-//            Configuration conf = context.getConfiguration();
-//            tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
-//            String spo = tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX + TMP;
-//            String po = tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX + TMP;
-//            String osp = tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX + TMP;
-//
-//            spo_table = new Text(spo);
-//            po_table = new Text(po);
-//            osp_table = new Text(osp);
-//        }
-//
-//        @Override
-//        protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
-//            //read in old format
-//            Statement statement = null;
-//            try {
-//                statement = translateOldStatementFromRow(ByteStreams.newDataInput(key.getRow().getBytes()), "spo", vf);
-//            } catch (Exception e) {
-//                //not the right version
-//                return;
-//            }
-//
-//            //translate to new format and save in new tables
-//            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Mutation> mutationMap = mut.serialize(statement.getSubject(), statement.getPredicate(), statement.getObject(), new ColumnVisibility(key.getColumnVisibility()), statement.getContext());
-//            Mutation spo = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-//            Mutation po = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
-//            Mutation osp = mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-//
-//            context.write(spo_table, spo);
-//            context.write(po_table, po);
-//            context.write(osp_table, osp);
-//
-//            //TODO: Contexts
-//        }
-//    }
-//
-//    public static org.openrdf.model.Value readOldValue(ByteArrayDataInput dataIn, ValueFactory vf)
-//            throws IOException, ClassCastException {
-//        int valueTypeMarker;
-//        try {
-//            valueTypeMarker = dataIn.readByte();
-//        } catch (Exception e) {
-//            return null;
-//        }
-//
-//        org.openrdf.model.Value ret = null;
-//        if (valueTypeMarker == RdfCloudTripleStoreConstants.URI_MARKER) {
-//            String uriString = readString(dataIn);
-//            ret = vf.createURI(uriString);
-//        } else if (valueTypeMarker == RdfCloudTripleStoreConstants.BNODE_MARKER) {
-//            String bnodeID = readString(dataIn);
-//            ret = vf.createBNode(bnodeID);
-//        } else if (valueTypeMarker == RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER) {
-//            String label = readString(dataIn);
-//            ret = vf.createLiteral(label);
-//        } else if (valueTypeMarker == RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER) {
-//            String label = readString(dataIn);
-//            String language = readString(dataIn);
-//            ret = vf.createLiteral(label, language);
-//        } else if (valueTypeMarker == RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER) {
-//            String label = readString(dataIn);
-//            URI datatype = (URI) readOldValue(dataIn, vf);
-//            ret = vf.createLiteral(label, datatype);
-//        } else {
-//            throw new InvalidValueTypeMarkerRuntimeException(valueTypeMarker, "Invalid value type marker: "
-//                    + valueTypeMarker);
-//        }
-//
-//        return ret;
-//    }
-//
-//    public static Statement translateOldStatementFromRow(ByteArrayDataInput input, String table, ValueFactory vf) throws IOException {
-//        Resource subject;
-//        URI predicate;
-//        org.openrdf.model.Value object;
-//        if ("spo".equals(table)) {
-//            subject = (Resource) readOldValue(input, vf);
-//            input.readByte();
-//            predicate = (URI) readOldValue(input, vf);
-//            input.readByte();
-//            object = readOldValue(input, vf);
-//        } else if ("o".equals(table)) {
-//            object = readOldValue(input, vf);
-//            input.readByte();
-//            predicate = (URI) readOldValue(input, vf);
-//            input.readByte();
-//            subject = (Resource) readOldValue(input, vf);
-//        } else if ("po".equals(table)) {
-//            predicate = (URI) readOldValue(input, vf);
-//            input.readByte();
-//            object = readOldValue(input, vf);
-//            input.readByte();
-//            subject = (Resource) readOldValue(input, vf);
-//        } else {
-//            //so
-//            subject = (Resource) readOldValue(input, vf);
-//            input.readByte();
-//            object = readOldValue(input, vf);
-//            input.readByte();
-//            predicate = (URI) readOldValue(input, vf);
-//        }
-//        return new StatementImpl(subject, predicate, object);
-//    }
-//
-//    public static byte[] writeOldValue(org.openrdf.model.Value value) throws IOException {
-//        if (value == null)
-//            return new byte[]{};
-//        ByteArrayDataOutput dataOut = ByteStreams.newDataOutput();
-//        if (value instanceof URI) {
-//            dataOut.writeByte(RdfCloudTripleStoreConstants.URI_MARKER);
-//            writeString(((URI) value).toString(), dataOut);
-//        } else if (value instanceof BNode) {
-//            dataOut.writeByte(RdfCloudTripleStoreConstants.BNODE_MARKER);
-//            writeString(((BNode) value).getID(), dataOut);
-//        } else if (value instanceof Literal) {
-//            Literal lit = (Literal) value;
-//
-//            String label = lit.getLabel();
-//            String language = lit.getLanguage();
-//            URI datatype = lit.getDatatype();
-//
-//            if (datatype != null) {
-//                dataOut.writeByte(RdfCloudTripleStoreConstants.DATATYPE_LITERAL_MARKER);
-//                writeString(label, dataOut);
-//                dataOut.write(writeOldValue(datatype));
-//            } else if (language != null) {
-//                dataOut.writeByte(RdfCloudTripleStoreConstants.LANG_LITERAL_MARKER);
-//                writeString(label, dataOut);
-//                writeString(language, dataOut);
-//            } else {
-//                dataOut.writeByte(RdfCloudTripleStoreConstants.PLAIN_LITERAL_MARKER);
-//                writeString(label, dataOut);
-//            }
-//        } else {
-//            throw new IllegalArgumentException("unexpected value type: "
-//                    + value.getClass());
-//        }
-//        return dataOut.toByteArray();
-//    }
-//
-//    private static String OLD_DELIM = "\u0001";
-//    private static byte[] OLD_DELIM_BYTES = OLD_DELIM.getBytes();
-//
-//    public static byte[] buildOldRowWith(byte[] bytes_one, byte[] bytes_two, byte[] bytes_three) throws IOException {
-//        ByteArrayDataOutput rowidout = ByteStreams.newDataOutput();
-//        rowidout.write(bytes_one);
-//        rowidout.write(OLD_DELIM_BYTES);
-//        rowidout.write(bytes_two);
-//        rowidout.write(OLD_DELIM_BYTES);
-//        rowidout.write(bytes_three);
-//        return truncateRowId(rowidout.toByteArray());
-//    }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java
deleted file mode 100644
index 950f585..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/utils/MRUtils.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package mvm.rya.cloudbase.mr.utils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.openrdf.model.URI;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-/**
- * Class MRSailUtils
- * Date: May 19, 2011
- * Time: 10:34:06 AM
- */
-public class MRUtils {
-
-    public static final String JOB_NAME_PROP = "mapred.job.name";
-
-    public static final String CB_USERNAME_PROP = "cb.username";
-    public static final String CB_PWD_PROP = "cb.pwd";
-    public static final String CB_ZK_PROP = "cb.zk";
-    public static final String CB_INSTANCE_PROP = "cb.instance";
-    public static final String CB_TTL_PROP = "cb.ttl";
-    public static final String CB_CV_PROP = "cb.cv";
-    public static final String CB_AUTH_PROP = "cb.auth";
-    public static final String CB_MOCK_PROP = "cb.mock";
-    public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout";
-    public static final String FORMAT_PROP = "rdf.format";
-
-    public static final String NAMED_GRAPH_PROP = "rdf.graph";
-
-    public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix";
-
-    // rdf constants
-    public static final ValueFactory vf = new ValueFactoryImpl();
-    public static final URI RDF_TYPE = vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type");
-
-
-    // cloudbase map reduce utils
-
-//    public static Range retrieveRange(URI entry_key, URI entry_val) throws IOException {
-//        ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput();
-//        startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_key));
-//        if (entry_val != null) {
-//            startRowOut.write(RdfCloudTripleStoreConstants.DELIM_BYTES);
-//            startRowOut.write(RdfCloudTripleStoreUtils.writeValue(entry_val));
-//        }
-//        byte[] startrow = startRowOut.toByteArray();
-//        startRowOut.write(RdfCloudTripleStoreConstants.DELIM_STOP_BYTES);
-//        byte[] stoprow = startRowOut.toByteArray();
-//
-//        Range range = new Range(new Text(startrow), new Text(stoprow));
-//        return range;
-//    }
-
-
-    public static String getCBTtl(Configuration conf) {
-        return conf.get(CB_TTL_PROP);
-    }
-
-    public static String getCBUserName(Configuration conf) {
-        return conf.get(CB_USERNAME_PROP);
-    }
-
-    public static String getCBPwd(Configuration conf) {
-        return conf.get(CB_PWD_PROP);
-    }
-
-    public static String getCBZK(Configuration conf) {
-        return conf.get(CB_ZK_PROP);
-    }
-
-    public static String getCBInstance(Configuration conf) {
-        return conf.get(CB_INSTANCE_PROP);
-    }
-
-    public static void setCBUserName(Configuration conf, String str) {
-        conf.set(CB_USERNAME_PROP, str);
-    }
-
-    public static void setCBPwd(Configuration conf, String str) {
-        conf.set(CB_PWD_PROP, str);
-    }
-
-    public static void setCBZK(Configuration conf, String str) {
-        conf.set(CB_ZK_PROP, str);
-    }
-
-    public static void setCBInstance(Configuration conf, String str) {
-        conf.set(CB_INSTANCE_PROP, str);
-    }
-
-    public static void setCBTtl(Configuration conf, String str) {
-        conf.set(CB_TTL_PROP, str);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
----------------------------------------------------------------------
diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
deleted file mode 100644
index d3f8ae7..0000000
--- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/BatchScannerCloseableIterable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package mvm.rya.cloudbase.query;
-
-import cloudbase.core.client.BatchScanner;
-import cloudbase.core.data.Key;
-import cloudbase.core.data.Value;
-import com.google.common.base.Preconditions;
-import mango.collect.AbstractCloseableIterable;
-import mvm.rya.cloudbase.BatchScannerIterator;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- */
-public class BatchScannerCloseableIterable extends AbstractCloseableIterable<Map.Entry<Key, Value>> {
-
-    private BatchScanner scanner;
-
-    public BatchScannerCloseableIterable(BatchScanner scanner) {
-        Preconditions.checkNotNull(scanner);
-        this.scanner = scanner;
-    }
-
-    @Override
-    protected void doClose() throws IOException {
-        scanner.close();
-    }
-
-    @Override
-    protected Iterator<Map.Entry<Key, Value>> retrieveIterator() {
-        return new BatchScannerIterator(scanner.iterator());
-    }
-}