You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by mi...@apache.org on 2015/12/07 13:04:35 UTC

[05/51] [partial] incubator-rya git commit: Cannot delete temp branch, doc'd it.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java
deleted file mode 100644
index 054146d..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java
+++ /dev/null
@@ -1,383 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.OrderedLoadFunc;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- * <p/>
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
- * <p/>
- * Tuples can be written in 2 forms:
- * (key, colfam, colqual, colvis, value)
- * OR
- * (key, colfam, colqual, value)
- */
-public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, OrderedLoadFunc {
-    private static final Log logger = LogFactory.getLog(AccumuloStorage.class);
-
-    protected Configuration conf;
-    protected RecordReader<Key, Value> reader;
-    protected RecordWriter<Text, Mutation> writer;
-
-    protected String inst;
-    protected String zookeepers;
-    protected String user = "";
-    protected String password = "";
-    protected String table;
-    protected Text tableName;
-    protected String auths;
-    protected Authorizations authorizations = Constants.NO_AUTHS;
-    protected List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text, Text>>();
-
-    protected Collection<Range> ranges = new ArrayList<Range>();
-    protected boolean mock = false;
-
-    public AccumuloStorage() {
-    }
-
-    @Override
-    public Tuple getNext() throws IOException {
-        try {
-            // load the next pair
-            if (!reader.nextKeyValue()) {
-                logger.info("Reached end of results");
-                return null;
-            }
-
-            Key key = (Key) reader.getCurrentKey();
-            Value value = (Value) reader.getCurrentValue();
-            assert key != null && value != null;
-
-            if (logger.isTraceEnabled()) {
-                logger.trace("Found key[" + key + "] and value[" + value + "]");
-            }
-
-            // and wrap it in a tuple
-            Tuple tuple = TupleFactory.getInstance().newTuple(6);
-            tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-            tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
-            tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
-            tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
-            tuple.set(4, key.getTimestamp());
-            tuple.set(5, new DataByteArray(value.get()));
-            if (logger.isTraceEnabled()) {
-                logger.trace("Output tuple[" + tuple + "]");
-            }
-            return tuple;
-        } catch (InterruptedException e) {
-            throw new IOException(e.getMessage());
-        }
-    }
-
-    @Override
-    public InputFormat getInputFormat() {
-        return new AccumuloInputFormat();
-    }
-
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split) {
-        this.reader = reader;
-    }
-
-    @Override
-    public void setLocation(String location, Job job) throws IOException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Set Location[" + location + "] for job[" + job.getJobName() + "]");
-        }
-        conf = job.getConfiguration();
-        setLocationFromUri(location, job);
-
-        if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) {
-            try {
-				AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
-			} catch (AccumuloSecurityException e) {
-				throw new RuntimeException(e);
-			}
-            AccumuloInputFormat.setInputTableName(job, table);
-    		AccumuloInputFormat.setScanAuthorizations(job, authorizations);
-            if (!mock) {
-                AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
-            } else {
-                AccumuloInputFormat.setMockInstance(job, inst);
-            }
-        }
-        if (columnFamilyColumnQualifierPairs.size() > 0)
-            AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
-        logger.info("Set ranges[" + ranges + "] for job[" + job.getJobName() + "] on table[" + table + "] " +
-                "for columns[" + columnFamilyColumnQualifierPairs + "] with authorizations[" + authorizations + "]");
-
-        if (ranges.size() == 0) {
-            throw new IOException("Accumulo Range must be specified");
-        }
-        AccumuloInputFormat.setRanges(job, ranges);
-    }
-
-    protected void setLocationFromUri(String uri, Job job) throws IOException {
-        // ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&range=a|z&range=1|9&mock=true
-        try {
-            if (!uri.startsWith("accumulo://"))
-                throw new Exception("Bad scheme.");
-            String[] urlParts = uri.split("\\?");
-            setLocationFromUriParts(urlParts);
-
-        } catch (Exception e) {
-            throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[range=startRow|endRow[...],columns=[cf1|cq1,cf2|cq2,...]],mock=true(false)]': " + e.getMessage(), e);
-        }
-    }
-
-    protected void setLocationFromUriParts(String[] urlParts) {
-        String columns = "";
-        if (urlParts.length > 1) {
-            for (String param : urlParts[1].split("&")) {
-                String[] pair = param.split("=");
-                if (pair[0].equals("instance")) {
-                    inst = pair[1];
-                } else if (pair[0].equals("user")) {
-                    user = pair[1];
-                } else if (pair[0].equals("password")) {
-                    password = pair[1];
-                } else if (pair[0].equals("zookeepers")) {
-                    zookeepers = pair[1];
-                } else if (pair[0].equals("auths")) {
-                    auths = pair[1];
-                } else if (pair[0].equals("columns")) {
-                    columns = pair[1];
-                } else if (pair[0].equals("range")) {
-                    String[] r = pair[1].split("\\|");
-                    if (r.length == 2) {
-                        addRange(new Range(r[0], r[1]));
-                    } else {
-                        addRange(new Range(r[0]));
-                    }
-                } else if (pair[0].equals("mock")) {
-                    this.mock = Boolean.parseBoolean(pair[1]);
-                }
-                addLocationFromUriPart(pair);
-            }
-        }
-        String[] parts = urlParts[0].split("/+");
-        table = parts[1];
-        tableName = new Text(table);
-
-        if (auths == null || auths.equals("")) {
-            authorizations = new Authorizations();
-        } else {
-            authorizations = new Authorizations(auths.split(","));
-        }
-
-        if (!columns.equals("")) {
-            for (String cfCq : columns.split(",")) {
-                if (cfCq.contains("|")) {
-                    String[] c = cfCq.split("\\|");
-                    String cf = c[0];
-                    String cq = c[1];
-                    addColumnPair(cf, cq);
-                } else {
-                    addColumnPair(cfCq, null);
-                }
-            }
-        }
-    }
-
-    protected void addColumnPair(String cf, String cq) {
-        columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>((cf != null) ? new Text(cf) : null, (cq != null) ? new Text(cq) : null));
-    }
-
-    protected void addLocationFromUriPart(String[] pair) {
-
-    }
-
-    protected void addRange(Range range) {
-        ranges.add(range);
-    }
-
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
-        return location;
-    }
-
-    @Override
-    public void setUDFContextSignature(String signature) {
-
-    }
-
-    /* StoreFunc methods */
-    public void setStoreFuncUDFContextSignature(String signature) {
-
-    }
-
-    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
-        return relativeToAbsolutePath(location, curDir);
-    }
-
-    public void setStoreLocation(String location, Job job) throws IOException {
-        conf = job.getConfiguration();
-        setLocationFromUri(location, job);
-
-        if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) {
-            try {
-				AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
-			} catch (AccumuloSecurityException e) {
-				new RuntimeException(e);
-			}
-            AccumuloOutputFormat.setDefaultTableName(job, table);
-            AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
-            BatchWriterConfig config = new BatchWriterConfig();
-            config.setMaxLatency(10, TimeUnit.SECONDS);
-            config.setMaxMemory(10 * 1000 * 1000);
-            config.setMaxWriteThreads(10);
-            AccumuloOutputFormat.setBatchWriterOptions(job, config);
-        }
-    }
-
-    public OutputFormat getOutputFormat() {
-        return new AccumuloOutputFormat();
-    }
-
-    public void checkSchema(ResourceSchema schema) throws IOException {
-        // we don't care about types, they all get casted to ByteBuffers
-    }
-
-    public void prepareToWrite(RecordWriter writer) {
-        this.writer = writer;
-    }
-
-    public void putNext(Tuple t) throws ExecException, IOException {
-        Mutation mut = new Mutation(objToText(t.get(0)));
-        Text cf = objToText(t.get(1));
-        Text cq = objToText(t.get(2));
-
-        if (t.size() > 4) {
-            Text cv = objToText(t.get(3));
-            Value val = new Value(objToBytes(t.get(4)));
-            if (cv.getLength() == 0) {
-                mut.put(cf, cq, val);
-            } else {
-                mut.put(cf, cq, new ColumnVisibility(cv), val);
-            }
-        } else {
-            Value val = new Value(objToBytes(t.get(3)));
-            mut.put(cf, cq, val);
-        }
-
-        try {
-            writer.write(tableName, mut);
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-    }
-
-    private static Text objToText(Object o) {
-        return new Text(objToBytes(o));
-    }
-
-    private static byte[] objToBytes(Object o) {
-        if (o instanceof String) {
-            String str = (String) o;
-            return str.getBytes();
-        } else if (o instanceof Long) {
-            Long l = (Long) o;
-            return l.toString().getBytes();
-        } else if (o instanceof Integer) {
-            Integer l = (Integer) o;
-            return l.toString().getBytes();
-        } else if (o instanceof Boolean) {
-            Boolean l = (Boolean) o;
-            return l.toString().getBytes();
-        } else if (o instanceof Float) {
-            Float l = (Float) o;
-            return l.toString().getBytes();
-        } else if (o instanceof Double) {
-            Double l = (Double) o;
-            return l.toString().getBytes();
-        }
-
-        // TODO: handle DataBag, Map<Object, Object>, and Tuple
-
-        return ((DataByteArray) o).get();
-    }
-
-    public void cleanupOnFailure(String failure, Job job) {
-    }
-
-    @Override
-    public WritableComparable<?> getSplitComparable(InputSplit inputSplit) throws IOException {
-        //cannot get access to the range directly
-        AccumuloInputFormat.RangeInputSplit rangeInputSplit = (AccumuloInputFormat.RangeInputSplit) inputSplit;
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(baos);
-        rangeInputSplit.write(out);
-        out.close();
-        DataInputStream stream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-        Range range = new Range();
-        range.readFields(stream);
-        stream.close();
-        return range;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java
deleted file mode 100644
index 392c108..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java
+++ /dev/null
@@ -1,348 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.regex.Pattern;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.Projection;
-import org.openrdf.query.algebra.ProjectionElem;
-import org.openrdf.query.algebra.ProjectionElemList;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class IndexWritingTool extends Configured implements Tool {
-    
-    private static final String sparql_key = "SPARQL.VALUE";
-    private static String cardCounter = "count";
-    
-    
-    public static void main(String[] args) throws Exception {
-        
-      ToolRunner.run(new Configuration(), new IndexWritingTool(), args);
-       
-    }
-
-    @Override
-    public int run(final String[] args) throws Exception {
-        Preconditions.checkArgument(args.length == 7, "java " + IndexWritingTool.class.getCanonicalName()
-                + " hdfsSaveLocation sparqlFile cbinstance cbzk cbuser cbpassword rdfTablePrefix.");
-
-        final String inputDir = args[0];
-        final String sparqlFile = args[1];
-        final String instStr = args[2];
-        final String zooStr = args[3];
-        final String userStr = args[4];
-        final String passStr = args[5];
-        final String tablePrefix = args[6];
-
-        String sparql = FileUtils.readFileToString(new File(sparqlFile));
-
-        Job job = new Job(getConf(), "Write HDFS Index to Accumulo");
-        job.setJarByClass(this.getClass());
-
-        Configuration jobConf = job.getConfiguration();
-        jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        setVarOrders(sparql, jobConf);
-
-        TextInputFormat.setInputPaths(job, inputDir);
-        job.setInputFormatClass(TextInputFormat.class);
-
-        job.setMapperClass(MyMapper.class);
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Mutation.class);
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Mutation.class);
-
-        job.setNumReduceTasks(0);
-
-        String tableName;
-        if (zooStr.equals("mock")) {
-            tableName = tablePrefix;
-        } else {
-            tableName = tablePrefix + "INDEX_" + UUID.randomUUID().toString().replace("-", "").toUpperCase();
-        }
-        setAccumuloOutput(instStr, zooStr, userStr, passStr, job, tableName);
-
-        jobConf.set(sparql_key, sparql);
-        
-        int complete = job.waitForCompletion(true) ? 0 : -1;
-
-        if (complete == 0) {
-            
-            String[] varOrders = jobConf.getStrings("varOrders");
-            String orders = Joiner.on("\u0000").join(varOrders);
-            Instance inst;
-            
-            if (zooStr.equals("mock")) {
-                inst = new MockInstance(instStr);
-            } else {
-                inst = new ZooKeeperInstance(instStr, zooStr);
-            }
-           
-            Connector conn = inst.getConnector(userStr, passStr.getBytes());
-            BatchWriter bw = conn.createBatchWriter(tableName, 10, 5000, 1);
-
-            Counters counters = job.getCounters();
-            Counter c1 = counters.findCounter(cardCounter, cardCounter);
-            
-            Mutation m = new Mutation("~SPARQL");
-            Value v = new Value(sparql.getBytes());
-            m.put(new Text("" + c1.getValue()), new Text(orders), v);
-            bw.addMutation(m);
-
-            bw.close();
-
-            return complete;
-        } else {
-            return complete;
-        }
-
-
-    }
-    
-    
-    public void setVarOrders(String s, Configuration conf) throws MalformedQueryException {
-
-        SPARQLParser parser = new SPARQLParser();
-        TupleExpr query = parser.parseQuery(s, null).getTupleExpr();
-
-        List<String> projList = Lists.newArrayList(((Projection) query).getProjectionElemList().getTargetNames());
-        String projElems = Joiner.on(";").join(projList);
-        conf.set("projElems", projElems);
-
-        Pattern splitPattern1 = Pattern.compile("\n");
-        Pattern splitPattern2 = Pattern.compile(",");
-        String[] lines = splitPattern1.split(s);
-
-        List<String> varOrders = Lists.newArrayList();
-        List<String> varOrderPos = Lists.newArrayList();
-
-        int orderNum = 0;
-        int projSizeSq = projList.size()*projList.size();
-        
-        for (String t : lines) {
-
-
-            if(orderNum > projSizeSq){
-                break;
-            }
-            
-            String[] order = null;
-            if (t.startsWith("#prefix")) {
-                t = t.substring(7).trim();
-                order = splitPattern2.split(t, projList.size());
-            }
-
-            
-            String tempVarOrder = "";
-            String tempVarOrderPos = "";
-
-            if (order != null) {
-                for (String u : order) {
-                    if (tempVarOrder.length() == 0) {
-                        tempVarOrder = u.trim();
-                    } else {
-                        tempVarOrder = tempVarOrder + ";" + u.trim();
-                    }
-                    int pos = projList.indexOf(u.trim());
-                    if (pos < 0) {
-                        throw new IllegalArgumentException("Invalid variable order!");
-                    } else {
-                        if (tempVarOrderPos.length() == 0) {
-                            tempVarOrderPos = tempVarOrderPos + pos;
-                        } else {
-                            tempVarOrderPos = tempVarOrderPos + ";" + pos;
-                        }
-                    }
-                }
-
-                varOrders.add(tempVarOrder);
-                varOrderPos.add(tempVarOrderPos);
-            }
-            
-            if(tempVarOrder.length() > 0) {
-                orderNum++;
-            }
-
-        }
-        
-        if(orderNum ==  0) {
-            varOrders.add(projElems);
-            String tempVarPos = "";
-            
-            for(int i = 0; i < projList.size(); i++) {
-                if(i == 0) {
-                    tempVarPos = Integer.toString(0);
-                } else {
-                    tempVarPos = tempVarPos + ";" + i;
-                }
-            }
-            varOrderPos.add(tempVarPos);
-            
-        }
-        
-        String[] vOrders = varOrders.toArray(new String[varOrders.size()]);
-        String[] vOrderPos = varOrderPos.toArray(new String[varOrderPos.size()]);
-        
-        
-        
-        conf.setStrings("varOrders", vOrders);
-        conf.setStrings("varOrderPos", vOrderPos);
-
-    }
-    
-
-    private static void setAccumuloOutput(String instStr, String zooStr, String userStr, String passStr, Job job, String tableName)
-            throws AccumuloSecurityException {
-
-        AuthenticationToken token = new PasswordToken(passStr);
-        AccumuloOutputFormat.setConnectorInfo(job, userStr, token);
-        AccumuloOutputFormat.setDefaultTableName(job, tableName);
-        AccumuloOutputFormat.setCreateTables(job, true);
-        //TODO best way to do this?
-        
-        if (zooStr.equals("mock")) {
-            AccumuloOutputFormat.setMockInstance(job, instStr);
-        } else {
-            AccumuloOutputFormat.setZooKeeperInstance(job, instStr, zooStr);
-        }
-
-        job.setOutputFormatClass(AccumuloOutputFormat.class);
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Mutation.class);
-    }
-
-    public static class MyMapper extends Mapper<LongWritable, Text, Text, Mutation> {
-        
-        private static final Logger logger = Logger.getLogger(MyMapper.class);
-        final static Text EMPTY_TEXT = new Text();
-        final static Value EMPTY_VALUE = new Value(new byte[] {});
-        private String[] varOrderPos = null;
-        private String[] projElem = null;
-        private Pattern splitPattern = null;
-        private List<List<Integer>> varPositions = Lists.newArrayList();
-        
-        
-
-        @Override
-        protected void setup(Mapper<LongWritable, Text, Text, Mutation>.Context context) throws IOException,
-                InterruptedException {
-           
-            Configuration conf = context.getConfiguration();
-            
-            varOrderPos = conf.getStrings("varOrderPos");
-            splitPattern = Pattern.compile("\t");
-            
-            for (String s : varOrderPos) {
-                String[] pos = s.split(";");
-                List<Integer> intPos = Lists.newArrayList();
-                int i = 0;
-                for(String t: pos) {
-                    i = Integer.parseInt(t);
-                    intPos.add(i);
-                }
-                
-                varPositions.add(intPos);
-                
-            }
-            
-            projElem = conf.get("projElems").split(";");
-            
-            super.setup(context);
-        }
-
-
-
-
-
-
-        @Override
-        public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException {
-
-            String[] result = splitPattern.split(value.toString());
-
-           
-            for (List<Integer> list : varPositions) {
-
-                String values = "";
-                String vars = "";
-
-                for (Integer i : list) {
-
-                    if (values.length() == 0) {
-                        values = result[i];
-                        vars = projElem[i];
-                    } else {
-                        values = values + "\u0000" + result[i];
-                        vars = vars + "\u0000" + projElem[i];
-                    }
-
-                }
-                Mutation m = new Mutation(new Text(values));
-                m.put(new Text(vars), EMPTY_TEXT, EMPTY_VALUE);
-                output.write(EMPTY_TEXT, m);
-
-            }
-            output.getCounter(cardCounter, cardCounter).increment(1);
-
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java
deleted file mode 100644
index ed8134d..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java
+++ /dev/null
@@ -1,268 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfEvalStatsDAO;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.accumulo.pig.optimizer.SimilarVarJoinOptimizer;
-import mvm.rya.rdftriplestore.evaluation.QueryJoinOptimizer;
-import mvm.rya.rdftriplestore.evaluation.RdfCloudTripleStoreEvaluationStatistics;
-import mvm.rya.rdftriplestore.inference.InferenceEngine;
-import mvm.rya.rdftriplestore.inference.InverseOfVisitor;
-import mvm.rya.rdftriplestore.inference.SymmetricPropertyVisitor;
-import mvm.rya.rdftriplestore.inference.TransitivePropertyVisitor;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.openrdf.query.algebra.QueryRoot;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.QueryParser;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import java.io.ByteArrayInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-
-/**
- * Created by IntelliJ IDEA.
- * Date: 4/23/12
- * Time: 9:31 AM
- * To change this template use File | Settings | File Templates.
- */
-public class SparqlQueryPigEngine {
-    private static final Log logger = LogFactory.getLog(SparqlQueryPigEngine.class);
-
-    private String hadoopDir;
-    private ExecType execType = ExecType.MAPREDUCE; //default to mapreduce
-    private boolean inference = true;
-    private boolean stats = true;
-    private SparqlToPigTransformVisitor sparqlToPigTransformVisitor;
-    private PigServer pigServer;
-    private InferenceEngine inferenceEngine = null;
-    private RdfCloudTripleStoreEvaluationStatistics rdfCloudTripleStoreEvaluationStatistics;
-    private AccumuloRyaDAO ryaDAO;
-    AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-
-    private AccumuloRdfEvalStatsDAO rdfEvalStatsDAO;
-
-    public AccumuloRdfConfiguration getConf() {
-        return conf;
-    }
-
-    public void setConf(AccumuloRdfConfiguration conf) {
-        this.conf = conf;
-    }
-
-    public void init() throws Exception {
-        Preconditions.checkNotNull(sparqlToPigTransformVisitor, "Sparql To Pig Transform Visitor must not be null");
-        logger.info("Initializing Sparql Query Pig Engine");
-        if (hadoopDir != null) {
-            //set hadoop dir property
-            System.setProperty("HADOOPDIR", hadoopDir);
-        }
-        //TODO: Maybe have validation of the HadoopDir system property
-
-        if (pigServer == null) {
-            pigServer = new PigServer(execType);
-        }
-
-        if (inference || stats) {
-            String instance = sparqlToPigTransformVisitor.getInstance();
-            String zoo = sparqlToPigTransformVisitor.getZk();
-            String user = sparqlToPigTransformVisitor.getUser();
-            String pass = sparqlToPigTransformVisitor.getPassword();
-
-            Connector connector = new ZooKeeperInstance(instance, zoo).getConnector(user, pass.getBytes());
-
-            String tablePrefix = sparqlToPigTransformVisitor.getTablePrefix();
-            conf.setTablePrefix(tablePrefix);
-            if (inference) {
-                logger.info("Using inference");
-                inferenceEngine = new InferenceEngine();
-                ryaDAO = new AccumuloRyaDAO();
-                ryaDAO.setConf(conf);
-                ryaDAO.setConnector(connector);
-                ryaDAO.init();
-
-                inferenceEngine.setRyaDAO(ryaDAO);
-                inferenceEngine.setConf(conf);
-                inferenceEngine.setSchedule(false);
-                inferenceEngine.init();
-            }
-            if (stats) {
-                logger.info("Using stats");
-                rdfEvalStatsDAO = new AccumuloRdfEvalStatsDAO();
-                rdfEvalStatsDAO.setConf(conf);
-                rdfEvalStatsDAO.setConnector(connector);
-//                rdfEvalStatsDAO.setEvalTable(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
-                rdfEvalStatsDAO.init();
-                rdfCloudTripleStoreEvaluationStatistics = new RdfCloudTripleStoreEvaluationStatistics(conf, rdfEvalStatsDAO);
-            }
-        }
-    }
-
-    public void destroy() throws Exception {
-        logger.info("Shutting down Sparql Query Pig Engine");
-        pigServer.shutdown();
-        if (ryaDAO != null) {
-            ryaDAO.destroy();
-        }
-        if (inferenceEngine != null) {
-            inferenceEngine.destroy();
-        }
-        if (rdfEvalStatsDAO != null) {
-            rdfEvalStatsDAO.destroy();
-        }
-    }
-
-    /**
-     * Transform a sparql query into a pig script and execute it. Save results in hdfsSaveLocation
-     *
-     * @param sparql           to execute
-     * @param hdfsSaveLocation to save the execution
-     * @throws java.io.IOException
-     */
-    public void runQuery(String sparql, String hdfsSaveLocation) throws IOException {
-        Preconditions.checkNotNull(sparql, "Sparql query cannot be null");
-        Preconditions.checkNotNull(hdfsSaveLocation, "Hdfs save location cannot be null");
-        logger.info("Running query[" + sparql + "]\n to Location[" + hdfsSaveLocation + "]");
-        pigServer.deleteFile(hdfsSaveLocation);
-        try {
-            String pigScript = generatePigScript(sparql);
-            if (logger.isDebugEnabled()) {
-                logger.debug("Pig script [" + pigScript + "]");
-            }
-            pigServer.registerScript(new ByteArrayInputStream(pigScript.getBytes()));
-            pigServer.store("PROJ", hdfsSaveLocation); //TODO: Make this a constant
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-
-    public String generatePigScript(String sparql) throws Exception {
-        Preconditions.checkNotNull(sparql, "Sparql query cannot be null");
-        QueryParser parser = new SPARQLParser();
-        ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
-        QueryRoot tupleExpr = new QueryRoot(parsedQuery.getTupleExpr());
-
-//        SimilarVarJoinOptimizer similarVarJoinOptimizer = new SimilarVarJoinOptimizer();
-//        similarVarJoinOptimizer.optimize(tupleExpr, null, null);
-
-        if (inference || stats) {
-            if (inference) {
-                tupleExpr.visit(new TransitivePropertyVisitor(conf, inferenceEngine));
-                tupleExpr.visit(new SymmetricPropertyVisitor(conf, inferenceEngine));
-                tupleExpr.visit(new InverseOfVisitor(conf, inferenceEngine));
-            }
-            if (stats) {
-                (new QueryJoinOptimizer(rdfCloudTripleStoreEvaluationStatistics)).optimize(tupleExpr, null, null);
-            }
-        }
-
-        sparqlToPigTransformVisitor.meet(tupleExpr);
-        return sparqlToPigTransformVisitor.getPigScript();
-    }
-
-
-    public static void main(String[] args) {
-        try {
-            Preconditions.checkArgument(args.length == 7, "Usage: java -cp <jar>:$PIG_LIB <class> sparqlFile hdfsSaveLocation cbinstance cbzk cbuser cbpassword rdfTablePrefix.\n " +
-                    "Sample command: java -cp java -cp cloudbase.pig-2.0.0-SNAPSHOT-shaded.jar:/usr/local/hadoop-etc/hadoop-0.20.2/hadoop-0.20.2-core.jar:/srv_old/hdfs-tmp/pig/pig-0.9.2/pig-0.9.2.jar:$HADOOP_HOME/conf mvm.rya.accumulo.pig.SparqlQueryPigEngine " +
-                    "tstSpqrl.query temp/engineTest stratus stratus13:2181 root password l_");
-            String sparql = new String(ByteStreams.toByteArray(new FileInputStream(args[0])));
-            String hdfsSaveLocation = args[1];
-            SparqlToPigTransformVisitor visitor = new SparqlToPigTransformVisitor();
-            visitor.setTablePrefix(args[6]);
-            visitor.setInstance(args[2]);
-            visitor.setZk(args[3]);
-            visitor.setUser(args[4]);
-            visitor.setPassword(args[5]);
-
-            SparqlQueryPigEngine engine = new SparqlQueryPigEngine();
-            engine.setSparqlToPigTransformVisitor(visitor);
-            engine.setInference(false);
-            engine.setStats(false);
-            
-            engine.init();
-
-            engine.runQuery(sparql, hdfsSaveLocation);
-
-            engine.destroy();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public String getHadoopDir() {
-        return hadoopDir;
-    }
-
-    public void setHadoopDir(String hadoopDir) {
-        this.hadoopDir = hadoopDir;
-    }
-
-    public PigServer getPigServer() {
-        return pigServer;
-    }
-
-    public void setPigServer(PigServer pigServer) {
-        this.pigServer = pigServer;
-    }
-
-    public ExecType getExecType() {
-        return execType;
-    }
-
-    public void setExecType(ExecType execType) {
-        this.execType = execType;
-    }
-
-    public boolean isInference() {
-        return inference;
-    }
-
-    public void setInference(boolean inference) {
-        this.inference = inference;
-    }
-
-    public boolean isStats() {
-        return stats;
-    }
-
-    public void setStats(boolean stats) {
-        this.stats = stats;
-    }
-
-    public SparqlToPigTransformVisitor getSparqlToPigTransformVisitor() {
-        return sparqlToPigTransformVisitor;
-    }
-
-    public void setSparqlToPigTransformVisitor(SparqlToPigTransformVisitor sparqlToPigTransformVisitor) {
-        this.sparqlToPigTransformVisitor = sparqlToPigTransformVisitor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java
deleted file mode 100644
index 38d8adb..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java
+++ /dev/null
@@ -1,345 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import org.openrdf.model.Literal;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.query.algebra.*;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-
-import java.util.*;
-
-/**
- * Created by IntelliJ IDEA.
- * Date: 4/12/12
- * Time: 10:17 AM
- * To change this template use File | Settings | File Templates.
- */
-public class SparqlToPigTransformVisitor extends QueryModelVisitorBase<RuntimeException> {
-    private StringBuilder pigScriptBuilder = new StringBuilder();
-    private String tablePrefix;
-    private String instance, zk, user, password; //TODO: use a Configuration object to get these
-
-    private Map<String, String> varToSet = new HashMap<String, String>();
-    private Map<TupleExpr, List<String>> exprToNames = new HashMap<TupleExpr, List<String>>();
-    private Map<TupleExpr, String> exprToVar = new HashMap<TupleExpr, String>();
-
-    private char i = 'A'; //TODO: do better, hack
-
-    public SparqlToPigTransformVisitor() {
-        pigScriptBuilder.append("set pig.splitCombination false;\n")
-                .append("set default_parallel 32;\n") //TODO: set parallel properly
-                .append("set mapred.map.tasks.speculative.execution false;\n")
-                .append("set mapred.reduce.tasks.speculative.execution false;\n")
-                .append("set io.sort.mb 256;\n")
-                .append("set mapred.child.java.opts -Xmx2048m;\n")
-                .append("set mapred.compress.map.output true;\n")
-                .append("set mapred.map.output.compression.codec org.apache.hadoop.io.compress.GzipCodec;\n")
-                .append("set io.file.buffer.size 65536;\n")
-                .append("set io.sort.factor 25;\n");
-    }
-
-    @Override
-    public void meet(StatementPattern node) throws RuntimeException {
-        super.meet(node);
-        String subjValue = getVarValue(node.getSubjectVar());
-        String predValue = getVarValue(node.getPredicateVar());
-        String objValue = getVarValue(node.getObjectVar());
-
-        String subj = i + "_s";
-        String pred = i + "_p";
-        String obj = i + "_o";
-        String var = i + "";
-        if (node.getSubjectVar().getValue() == null) {                  //TODO: look nicer
-            subj = node.getSubjectVar().getName();
-            varToSet.put(subj, var);
-
-            addToExprToNames(node, subj);
-        }
-        if (node.getPredicateVar().getValue() == null) {                  //TODO: look nicer
-            pred = node.getPredicateVar().getName();
-            varToSet.put(pred, var);
-
-            addToExprToNames(node, pred);
-        }
-        if (node.getObjectVar().getValue() == null) {                  //TODO: look nicer
-            obj = node.getObjectVar().getName();
-            varToSet.put(obj, var);
-
-            addToExprToNames(node, obj);
-        }
-        if (node.getContextVar() != null && node.getContextVar().getValue() == null) {
-            String cntxtName = node.getContextVar().getName();
-            varToSet.put(cntxtName, var);
-
-            addToExprToNames(node, cntxtName);
-        }
-        //load 'l_' using mvm.rya.cloudbase.pig.dep.StatementPatternStorage('<http://www.Department0.University0.edu>', '', '',
-        // 'stratus', 'stratus13:2181', 'root', 'password') AS (dept:chararray, p:chararray, univ:chararray);
-//        pigScriptBuilder.append(i).append(" = load '").append(tablePrefix).append("' using mvm.rya.cloudbase.pig.dep.StatementPatternStorage('")
-//                .append(subjValue).append("','").append(predValue).append("','").append(objValue).append("','").append(instance).append("','")
-//                .append(zk).append("','").append(user).append("','").append(password).append("') AS (").append(subj).append(":chararray, ")
-//                .append(pred).append(":chararray, ").append(obj).append(":chararray);\n");
-
-        //load 'cloudbase://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c'
-        //using mvm.rya.accumulo.pig.StatementPatternStorage() AS (dept:chararray, p:chararray, univ:chararray);
-        pigScriptBuilder.append(i).append(" = load 'accumulo://").append(tablePrefix).append("?instance=").append(instance).append("&user=").append(user)
-                .append("&password=").append(password).append("&zookeepers=").append(zk);
-        if (subjValue != null && subjValue.length() > 0) {
-            pigScriptBuilder.append("&subject=").append(subjValue);
-        }
-        if (predValue != null && predValue.length() > 0) {
-            pigScriptBuilder.append("&predicate=").append(predValue);
-        }
-        if (objValue != null && objValue.length() > 0) {
-            pigScriptBuilder.append("&object=").append(objValue);
-        }
-        if (node.getContextVar() != null && node.getContextVar().getValue() != null) {
-            pigScriptBuilder.append("&context=").append(getVarValue(node.getContextVar()));
-        }
-
-        pigScriptBuilder.append("' using ").append(StatementPatternStorage.class.getName()).append("() AS (").append(subj).append(":chararray, ")
-                .append(pred).append(":chararray, ").append(obj).append(":chararray");
-        if (node.getContextVar() != null) {
-            Value cntxtValue = node.getContextVar().getValue();
-            String cntxtName = null;
-            if (cntxtValue == null) {
-                //use name
-                cntxtName = node.getContextVar().getName();
-            } else {
-                cntxtName = i + "_c";
-            }
-            pigScriptBuilder.append(", ").append(cntxtName).append(":chararray");
-        }
-        pigScriptBuilder.append(");\n");
-        //TODO: add auths
-
-        exprToVar.put(node, var);
-        i++;
-    }
-
-    private void addToExprToNames(TupleExpr node, String name) {
-        List<String> names = exprToNames.get(node);
-        if (names == null) {
-            names = new ArrayList<String>();
-            exprToNames.put(node, names);
-        }
-        names.add(name);
-    }
-
-    @Override
-    public void meet(Union node) throws RuntimeException {
-        super.meet(node);
-
-        TupleExpr leftArg = node.getLeftArg();
-        TupleExpr rightArg = node.getRightArg();
-        String left_var = exprToVar.get(leftArg);
-        String right_var = exprToVar.get(rightArg);
-        //Q = UNION ONSCHEMA B, P;
-        pigScriptBuilder.append(i).append(" = UNION ONSCHEMA ").append(left_var).append(", ").append(right_var).append(";\n");
-
-        String unionVar = i + "";
-        List<String> left_names = exprToNames.get(leftArg);
-        List<String> right_names = exprToNames.get(rightArg);
-        for (String name : left_names) {
-            varToSet.put(name, unionVar);
-            addToExprToNames(node, name);
-        }
-        for (String name : right_names) {
-            varToSet.put(name, unionVar);
-            addToExprToNames(node, name);
-        }
-        exprToVar.put(node, unionVar);
-        i++;
-    }
-
-    @Override
-    public void meet(Join node) throws RuntimeException {
-        super.meet(node);
-
-        TupleExpr leftArg = node.getLeftArg();
-        TupleExpr rightArg = node.getRightArg();
-        List<String> left_names = exprToNames.get(leftArg);
-        List<String> right_names = exprToNames.get(rightArg);
-
-        Set<String> joinNames = new HashSet<String>(left_names);
-        joinNames.retainAll(right_names); //intersection, this is what I join on
-        //SEC = join FIR by (MEMB_OF::ugrad, SUBORG_J::univ), UGRADDEG by (ugrad, univ);
-        StringBuilder joinStr = new StringBuilder();
-        joinStr.append("(");
-        boolean first = true;
-        for (String name : joinNames) { //TODO: Make this a utility method
-            if (!first) {
-                joinStr.append(",");
-            }
-            first = false;
-            joinStr.append(name);
-        }
-        joinStr.append(")");
-
-        String left_var = exprToVar.get(leftArg);
-        String right_var = exprToVar.get(rightArg);
-        if (joinStr.length() <= 2) {
-            //no join params, need to cross
-            pigScriptBuilder.append(i).append(" = cross ").append(left_var).append(", ").append(right_var).append(";\n");
-        } else {
-            //join
-            pigScriptBuilder.append(i).append(" = join ").append(left_var);
-            pigScriptBuilder.append(" by ").append(joinStr);
-            pigScriptBuilder.append(", ").append(right_var);
-            pigScriptBuilder.append(" by ").append(joinStr);
-            pigScriptBuilder.append(";\n");
-
-        }
-
-        String joinVarStr = i + "";
-        i++;
-        // D = foreach C GENERATE A::subj AS subj:chararray, A::A_p AS p:chararray;
-        String forEachVarStr = i + "";
-        pigScriptBuilder.append(i).append(" = foreach ").append(joinVarStr).append(" GENERATE ");
-        Map<String, String> nameToJoinName = new HashMap<String, String>();
-        for (String name : left_names) {
-            varToSet.put(name, forEachVarStr);
-            addToExprToNames(node, name);
-            nameToJoinName.put(name, left_var + "::" + name);
-        }
-        for (String name : right_names) {
-            varToSet.put(name, forEachVarStr);
-            addToExprToNames(node, name);
-            nameToJoinName.put(name, right_var + "::" + name);
-        }
-
-        first = true;
-        for (Map.Entry entry : nameToJoinName.entrySet()) {
-            if (!first) {
-                pigScriptBuilder.append(",");
-            }
-            first = false;
-            pigScriptBuilder.append(entry.getValue()).append(" AS ").append(entry.getKey()).append(":chararray ");
-        }
-        pigScriptBuilder.append(";\n");
-
-        exprToVar.put(node, forEachVarStr);
-        i++;
-    }
-
-    @Override
-    public void meet(Projection node) throws RuntimeException {
-        super.meet(node);
-        ProjectionElemList list = node.getProjectionElemList();
-        String set = null;
-        StringBuilder projList = new StringBuilder();
-        boolean first = true;
-        //TODO: we do not support projections from multiple pig statements yet
-        for (String name : list.getTargetNames()) {
-            set = varToSet.get(name);  //TODO: overwrite
-            if (set == null) {
-                throw new IllegalArgumentException("Have not found any pig logic for name[" + name + "]");
-            }
-            if (!first) {
-                projList.append(",");
-            }
-            first = false;
-            projList.append(name);
-        }
-        if (set == null)
-            throw new IllegalArgumentException(""); //TODO: Fill this
-        //SUBORG = FOREACH SUBORG_L GENERATE dept, univ;
-        pigScriptBuilder.append("PROJ = FOREACH ").append(set).append(" GENERATE ").append(projList.toString()).append(";\n");
-    }
-
-    @Override
-    public void meet(Slice node) throws RuntimeException {
-        super.meet(node);
-        long limit = node.getLimit();
-        //PROJ = LIMIT PROJ 10;
-        pigScriptBuilder.append("PROJ = LIMIT PROJ ").append(limit).append(";\n");
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public String getUser() {
-        return user;
-    }
-
-    public void setUser(String user) {
-        this.user = user;
-    }
-
-    public String getZk() {
-        return zk;
-    }
-
-    public void setZk(String zk) {
-        this.zk = zk;
-    }
-
-    public String getInstance() {
-        return instance;
-    }
-
-    public void setInstance(String instance) {
-        this.instance = instance;
-    }
-
-    public String getTablePrefix() {
-        return tablePrefix;
-    }
-
-    public void setTablePrefix(String tablePrefix) {
-        this.tablePrefix = tablePrefix;
-    }
-
-    public String getPigScript() {
-        return pigScriptBuilder.toString();
-    }
-
-    protected String getVarValue(Var var) {
-        if (var == null) {
-            return "";
-        } else {
-            Value value = var.getValue();
-            if (value == null) {
-                return "";
-            }
-            if (value instanceof URI) {
-                return "<" + value.stringValue() + ">";
-            }
-            if (value instanceof Literal) {
-                Literal lit = (Literal) value;
-                if (lit.getDatatype() == null) {
-                    //string
-                    return "\\'" + value.stringValue() + "\\'";
-                }
-            }
-            return value.stringValue();
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java
deleted file mode 100644
index 9ec9d45..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java
+++ /dev/null
@@ -1,304 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.api.query.strategy.ByteRange;
-import mvm.rya.api.query.strategy.TriplePatternStrategy;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.rdftriplestore.inference.InferenceEngine;
-import mvm.rya.rdftriplestore.inference.InferenceEngineException;
-
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.openrdf.model.Resource;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.model.vocabulary.RDF;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.Var;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.QueryParser;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteStreams;
-
-/**
- */
-public class StatementPatternStorage extends AccumuloStorage {
-    private static final Log logger = LogFactory.getLog(StatementPatternStorage.class);
-    protected TABLE_LAYOUT layout;
-    protected String subject = "?s";
-    protected String predicate = "?p";
-    protected String object = "?o";
-    protected String context;
-    private Value subject_value;
-    private Value predicate_value;
-    private Value object_value;
-
-    private RyaTripleContext ryaContext;
-
-    /**
-     * whether to turn inferencing on or off
-     */
-    private boolean infer = false;
-
-    public StatementPatternStorage() {
-    	if (super.conf != null){
-    		ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(super.conf));
-    	}
-    	else {
-    		ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
-    	}
-    	
-    }
-
-    private Value getValue(Var subjectVar) {
-        return subjectVar.hasValue() ? subjectVar.getValue() : null;
-    }
-
-    @Override
-    public void setLocation(String location, Job job) throws IOException {
-        super.setLocation(location, job);
-    }
-
-    @Override
-    protected void setLocationFromUri(String uri, Job job) throws IOException {
-        super.setLocationFromUri(uri, job);
-        // ex: accumulo://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c&context=c&infer=true
-        addStatementPatternRange(subject, predicate, object, context);
-        if (infer) {
-            addInferredRanges(table, job);
-        }
-
-        if (layout == null || ranges.size() == 0)
-            throw new IllegalArgumentException("Range and/or layout is null. Check the query");
-        table = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, table);
-        tableName = new Text(table);
-    }
-
-    @Override
-    protected void addLocationFromUriPart(String[] pair) {
-        if (pair[0].equals("subject")) {
-            this.subject = pair[1];
-        } else if (pair[0].equals("predicate")) {
-            this.predicate = pair[1];
-        } else if (pair[0].equals("object")) {
-            this.object = pair[1];
-        } else if (pair[0].equals("context")) {
-            this.context = pair[1];
-        } else if (pair[0].equals("infer")) {
-            this.infer = Boolean.parseBoolean(pair[1]);
-        }
-    }
-
-    protected void addStatementPatternRange(String subj, String pred, String obj, String ctxt) throws IOException {
-        logger.info("Adding statement pattern[subject:" + subj + ", predicate:" + pred + ", object:" + obj + ", context:" + ctxt + "]");
-        StringBuilder sparqlBuilder = new StringBuilder();
-        sparqlBuilder.append("select * where {\n");
-        if (ctxt != null) {
-            /**
-             * select * where {
-             GRAPH ?g {
-             <http://www.example.org/exampleDocument#Monica> ?p ?o.
-             }
-             }
-             */
-            sparqlBuilder.append("GRAPH ").append(ctxt).append(" {\n");
-        }
-        sparqlBuilder.append(subj).append(" ").append(pred).append(" ").append(obj).append(".\n");
-        if (ctxt != null) {
-            sparqlBuilder.append("}\n");
-        }
-        sparqlBuilder.append("}\n");
-        String sparql = sparqlBuilder.toString();
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Sparql statement range[" + sparql + "]");
-        }
-
-        QueryParser parser = new SPARQLParser();
-        ParsedQuery parsedQuery = null;
-        try {
-            parsedQuery = parser.parseQuery(sparql, null);
-        } catch (MalformedQueryException e) {
-            throw new IOException(e);
-        }
-        parsedQuery.getTupleExpr().visitChildren(new QueryModelVisitorBase<IOException>() {
-            @Override
-            public void meet(StatementPattern node) throws IOException {
-                Var subjectVar = node.getSubjectVar();
-                Var predicateVar = node.getPredicateVar();
-                Var objectVar = node.getObjectVar();
-                subject_value = getValue(subjectVar);
-                predicate_value = getValue(predicateVar);
-                object_value = getValue(objectVar);
-                Var contextVar = node.getContextVar();
-                Map.Entry<TABLE_LAYOUT, Range> temp = createRange(subject_value, predicate_value, object_value);
-                layout = temp.getKey();
-                Range range = temp.getValue();
-                addRange(range);
-                if (contextVar != null && contextVar.getValue() != null) {
-                    String context_str = contextVar.getValue().stringValue();
-                    addColumnPair(context_str, "");
-                }
-            }
-        });
-    }
-
-    protected Map.Entry<TABLE_LAYOUT, Range> createRange(Value s_v, Value p_v, Value o_v) throws IOException {
-        RyaURI subject_rya = RdfToRyaConversions.convertResource((Resource) s_v);
-        RyaURI predicate_rya = RdfToRyaConversions.convertURI((URI) p_v);
-        RyaType object_rya = RdfToRyaConversions.convertValue(o_v);
-        TriplePatternStrategy strategy = ryaContext.retrieveStrategy(subject_rya, predicate_rya, object_rya, null);
-        if (strategy == null)
-            return new RdfCloudTripleStoreUtils.CustomEntry<TABLE_LAYOUT, Range>(TABLE_LAYOUT.SPO, new Range());
-        Map.Entry<TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(subject_rya, predicate_rya, object_rya, null, null);
-        ByteRange byteRange = entry.getValue();
-        return new RdfCloudTripleStoreUtils.CustomEntry<mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT, Range>(
-                entry.getKey(), new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd()))
-        );
-    }
-
-    protected void addInferredRanges(String tablePrefix, Job job) throws IOException {
-        logger.info("Adding inferences to statement pattern[subject:" + subject_value + ", predicate:" + predicate_value + ", object:" + object_value + "]");
-        //inference engine
-        AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO();
-        InferenceEngine inferenceEngine = new InferenceEngine();
-        try {
-            AccumuloRdfConfiguration rdfConf = new AccumuloRdfConfiguration(job.getConfiguration());
-            rdfConf.setTablePrefix(tablePrefix);
-            ryaDAO.setConf(rdfConf);
-            try {
-                if (!mock) {
-                    ryaDAO.setConnector(new ZooKeeperInstance(inst, zookeepers).getConnector(user, password.getBytes()));
-                } else {
-                    ryaDAO.setConnector(new MockInstance(inst).getConnector(user, password.getBytes()));
-                }
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-            ryaDAO.init();
-            inferenceEngine.setConf(rdfConf);
-            inferenceEngine.setRyaDAO(ryaDAO);
-            inferenceEngine.setSchedule(false);
-            inferenceEngine.init();
-            //is it subclassof or subpropertyof
-            if (RDF.TYPE.equals(predicate_value)) {
-                //try subclassof
-                Collection<URI> parents = inferenceEngine.findParents(inferenceEngine.getSubClassOfGraph(), (URI) object_value);
-                if (parents != null && parents.size() > 0) {
-                    //subclassof relationships found
-                    //don't add self, that will happen anyway later
-                    //add all relationships
-                    for (URI parent : parents) {
-                        Map.Entry<TABLE_LAYOUT, Range> temp =
-                                createRange(subject_value, predicate_value, parent);
-                        Range range = temp.getValue();
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Found subClassOf relationship [type:" + object_value + " is subClassOf:" + parent + "]");
-                        }
-                        addRange(range);
-                    }
-                }
-            } else if (predicate_value != null) {
-                //subpropertyof check
-                Set<URI> parents = inferenceEngine.findParents(inferenceEngine.getSubPropertyOfGraph(), (URI) predicate_value);
-                for (URI parent : parents) {
-                    Map.Entry<TABLE_LAYOUT, Range> temp =
-                            createRange(subject_value, parent, object_value);
-                    Range range = temp.getValue();
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Found subPropertyOf relationship [type:" + predicate_value + " is subPropertyOf:" + parent + "]");
-                    }
-                    addRange(range);
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Exception in adding inferred ranges", e);
-            throw new IOException(e);
-        } finally {
-            if (inferenceEngine != null) {
-                try {
-                    inferenceEngine.destroy();
-                } catch (InferenceEngineException e) {
-                    logger.error("Exception closing InferenceEngine", e);
-                }
-            }
-            if (ryaDAO != null) {
-                try {
-                    ryaDAO.destroy();
-                } catch (RyaDAOException e) {
-                    logger.error("Exception closing ryadao", e);
-                }
-            }
-        }
-    }
-
-    @Override
-    public Tuple getNext() throws IOException {
-        try {
-            if (reader.nextKeyValue()) {
-                Key key = (Key) reader.getCurrentKey();
-                org.apache.accumulo.core.data.Value value = (org.apache.accumulo.core.data.Value) reader.getCurrentValue();
-                ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes());
-                RyaStatement ryaStatement = ryaContext.deserializeTriple(layout, new TripleRow(key.getRow().getBytes(),
-                        key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes()));
-
-                Tuple tuple = TupleFactory.getInstance().newTuple(4);
-                tuple.set(0, ryaStatement.getSubject().getData());
-                tuple.set(1, ryaStatement.getPredicate().getData());
-                tuple.set(2, ryaStatement.getObject().getData());
-                tuple.set(3, (ryaStatement.getContext() != null) ? (ryaStatement.getContext().getData()) : (null));
-                return tuple;
-            }
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java
deleted file mode 100644
index 4b458b6..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java
+++ /dev/null
@@ -1,210 +0,0 @@
-package mvm.rya.accumulo.pig.optimizer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.Dataset;
-import org.openrdf.query.algebra.*;
-import org.openrdf.query.algebra.evaluation.QueryOptimizer;
-import org.openrdf.query.algebra.evaluation.impl.EvaluationStatistics;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-import org.openrdf.query.algebra.helpers.StatementPatternCollector;
-
-import java.util.*;
-
-/**
- * A query optimizer that re-orders nested Joins according to cardinality, preferring joins that have similar variables.
- *
- */
-public class SimilarVarJoinOptimizer implements QueryOptimizer {
-
-    protected final EvaluationStatistics statistics;
-
-    public SimilarVarJoinOptimizer() {
-        this(new EvaluationStatistics());
-    }
-
-    public SimilarVarJoinOptimizer(EvaluationStatistics statistics) {
-        this.statistics = statistics;
-    }
-
-    /**
-     * Applies generally applicable optimizations: path expressions are sorted
-     * from more to less specific.
-     *
-     * @param tupleExpr
-     */
-    public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) {
-        tupleExpr.visit(new JoinVisitor());
-    }
-
-    protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> {
-
-        Set<String> boundVars = new HashSet<String>();
-
-        @Override
-        public void meet(LeftJoin leftJoin) {
-            leftJoin.getLeftArg().visit(this);
-
-            Set<String> origBoundVars = boundVars;
-            try {
-                boundVars = new HashSet<String>(boundVars);
-                boundVars.addAll(leftJoin.getLeftArg().getBindingNames());
-
-                leftJoin.getRightArg().visit(this);
-            } finally {
-                boundVars = origBoundVars;
-            }
-        }
-
-        @Override
-        public void meet(Join node) {
-            Set<String> origBoundVars = boundVars;
-            try {
-                boundVars = new HashSet<String>(boundVars);
-
-                // Recursively get the join arguments
-                List<TupleExpr> joinArgs = getJoinArgs(node, new ArrayList<TupleExpr>());
-
-                // Build maps of cardinalities and vars per tuple expression
-                Map<TupleExpr, Double> cardinalityMap = new HashMap<TupleExpr, Double>();
-
-                for (TupleExpr tupleExpr : joinArgs) {
-                    double cardinality = statistics.getCardinality(tupleExpr);
-                    cardinalityMap.put(tupleExpr, cardinality);
-                }
-
-                // Reorder the (recursive) join arguments to a more optimal sequence
-                List<TupleExpr> orderedJoinArgs = new ArrayList<TupleExpr>(joinArgs.size());
-                TupleExpr last = null;
-                while (!joinArgs.isEmpty()) {
-                    TupleExpr tupleExpr = selectNextTupleExpr(joinArgs, cardinalityMap, last);
-                    if (tupleExpr == null) {
-                        break;
-                    }
-
-                    joinArgs.remove(tupleExpr);
-                    orderedJoinArgs.add(tupleExpr);
-                    last = tupleExpr;
-
-                    // Recursively optimize join arguments
-                    tupleExpr.visit(this);
-
-                    boundVars.addAll(tupleExpr.getBindingNames());
-                }
-
-                // Build new join hierarchy
-                // Note: generated hierarchy is right-recursive to help the
-                // IterativeEvaluationOptimizer to factor out the left-most join
-                // argument
-                int i = 0;
-                TupleExpr replacement = orderedJoinArgs.get(i);
-                for (i++; i < orderedJoinArgs.size(); i++) {
-                    replacement = new Join(replacement, orderedJoinArgs.get(i));
-                }
-
-                // Replace old join hierarchy
-                node.replaceWith(replacement);
-            } finally {
-                boundVars = origBoundVars;
-            }
-        }
-
-        protected <L extends List<TupleExpr>> L getJoinArgs(TupleExpr tupleExpr, L joinArgs) {
-            if (tupleExpr instanceof Join) {
-                Join join = (Join) tupleExpr;
-                getJoinArgs(join.getLeftArg(), joinArgs);
-                getJoinArgs(join.getRightArg(), joinArgs);
-            } else {
-                joinArgs.add(tupleExpr);
-            }
-
-            return joinArgs;
-        }
-
-        protected List<Var> getStatementPatternVars(TupleExpr tupleExpr) {
-            if(tupleExpr == null)
-                return null;
-            List<StatementPattern> stPatterns = StatementPatternCollector.process(tupleExpr);
-            List<Var> varList = new ArrayList<Var>(stPatterns.size() * 4);
-            for (StatementPattern sp : stPatterns) {
-                sp.getVars(varList);
-            }
-            return varList;
-        }
-
-        protected <M extends Map<Var, Integer>> M getVarFreqMap(List<Var> varList, M varFreqMap) {
-            for (Var var : varList) {
-                Integer freq = varFreqMap.get(var);
-                freq = (freq == null) ? 1 : freq + 1;
-                varFreqMap.put(var, freq);
-            }
-            return varFreqMap;
-        }
-
-        /**
-         * Selects from a list of tuple expressions the next tuple expression that
-         * should be evaluated. This method selects the tuple expression with
-         * highest number of bound variables, preferring variables that have been
-         * bound in other tuple expressions over variables with a fixed value.
-         */
-        protected TupleExpr selectNextTupleExpr(List<TupleExpr> expressions,
-                                                Map<TupleExpr, Double> cardinalityMap,
-                                                TupleExpr last) {
-            double lowestCardinality = Double.MAX_VALUE;
-            TupleExpr result = expressions.get(0);
-            expressions = getExprsWithSameVars(expressions, last);
-
-            for (TupleExpr tupleExpr : expressions) {
-                // Calculate a score for this tuple expression
-                double cardinality = cardinalityMap.get(tupleExpr);
-
-                if (cardinality < lowestCardinality) {
-                    // More specific path expression found
-                    lowestCardinality = cardinality;
-                    result = tupleExpr;
-                }
-            }
-
-            return result;
-        }
-
-        protected List<TupleExpr> getExprsWithSameVars(List<TupleExpr> expressions, TupleExpr last) {
-            if(last == null)
-                return expressions;
-            List<TupleExpr> retExprs = new ArrayList<TupleExpr>();
-            for(TupleExpr tupleExpr : expressions) {
-                List<Var> statementPatternVars = getStatementPatternVars(tupleExpr);
-                List<Var> lastVars = getStatementPatternVars(last);
-                statementPatternVars.retainAll(lastVars);
-                if(statementPatternVars.size() > 0) {
-                    retExprs.add(tupleExpr);
-                }
-            }
-            if(retExprs.size() == 0) {
-                return expressions;
-            }
-            return retExprs;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java
deleted file mode 100644
index 119ccb1..0000000
--- a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java
+++ /dev/null
@@ -1,284 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.admin.SecurityOperations;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.pig.data.Tuple;
-
-/**
- * Created by IntelliJ IDEA.
- * Date: 4/20/12
- * Time: 10:17 AM
- * To change this template use File | Settings | File Templates.
- */
-public class AccumuloStorageTest extends TestCase {
-
-    private String user = "user";
-    private String pwd = "pwd";
-    private String instance = "myinstance";
-    private String table = "testTable";
-    private Authorizations auths = Constants.NO_AUTHS;
-    private Connector connector;
-
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-        connector = new MockInstance(instance).getConnector(user, new PasswordToken(pwd.getBytes()));
-        connector.tableOperations().create(table);
-        SecurityOperations secOps = connector.securityOperations();
-        secOps.createLocalUser(user, new PasswordToken(pwd.getBytes()));
-        secOps.grantTablePermission(user, table, TablePermission.READ);
-        secOps.grantTablePermission(user, table, TablePermission.WRITE);
-    }
-
-    @Override
-    public void tearDown() throws Exception {
-        super.tearDown();
-        connector.tableOperations().delete(table);
-    }
-
-    public void testSimpleOutput() throws Exception {
-        BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
-        Mutation row = new Mutation("row");
-        row.put("cf", "cq", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        batchWriter.flush();
-        batchWriter.close();
-
-        String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|z&mock=true";
-        AccumuloStorage storage = createAccumuloStorage(location);
-        int count = 0;
-        while (true) {
-            Tuple next = storage.getNext();
-            if (next == null)
-                break;
-            assertEquals(6, next.size());
-            count++;
-        }
-        assertEquals(1, count);
-    }
-
-    public void testRange() throws Exception {
-        BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
-        Mutation row = new Mutation("a");
-        row.put("cf", "cq", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        row = new Mutation("b");
-        row.put("cf", "cq", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        row = new Mutation("d");
-        row.put("cf", "cq", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        batchWriter.flush();
-        batchWriter.close();
-
-        String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&mock=true";
-        AccumuloStorage storage = createAccumuloStorage(location);
-        int count = 0;
-        while (true) {
-            Tuple next = storage.getNext();
-            if (next == null)
-                break;
-            assertEquals(6, next.size());
-            count++;
-        }
-        assertEquals(2, count);
-    }
-
-    public void testMultipleRanges() throws Exception {
-        BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
-        Mutation row = new Mutation("a");
-        row.put("cf", "cq", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        row = new Mutation("b");
-        row.put("cf", "cq", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        row = new Mutation("d");
-        row.put("cf", "cq", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        batchWriter.flush();
-        batchWriter.close();
-
-        String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&range=d|e&mock=true";
-        List<AccumuloStorage> storages = createAccumuloStorages(location);
-        assertEquals(2, storages.size());
-        AccumuloStorage storage = storages.get(0);
-        int count = 0;
-        while (true) {
-            Tuple next = storage.getNext();
-            if (next == null)
-                break;
-            assertEquals(6, next.size());
-            count++;
-        }
-        assertEquals(2, count);
-        storage = storages.get(1);
-        count = 0;
-        while (true) {
-            Tuple next = storage.getNext();
-            if (next == null)
-                break;
-            assertEquals(6, next.size());
-            count++;
-        }
-        assertEquals(1, count);
-    }
-
-    public void testColumns() throws Exception {
-        BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
-        Mutation row = new Mutation("a");
-        row.put("cf1", "cq", new Value(new byte[0]));
-        row.put("cf2", "cq", new Value(new byte[0]));
-        row.put("cf3", "cq1", new Value(new byte[0]));
-        row.put("cf3", "cq2", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        batchWriter.flush();
-        batchWriter.close();
-
-        String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&columns=cf1,cf3|cq1&mock=true";
-        AccumuloStorage storage = createAccumuloStorage(location);
-        int count = 0;
-        while (true) {
-            Tuple next = storage.getNext();
-            if (next == null)
-                break;
-            assertEquals(6, next.size());
-            count++;
-        }
-        assertEquals(2, count);
-    }
-
-    public void testWholeRowRange() throws Exception {
-        BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
-        Mutation row = new Mutation("a");
-        row.put("cf1", "cq", new Value(new byte[0]));
-        row.put("cf2", "cq", new Value(new byte[0]));
-        row.put("cf3", "cq1", new Value(new byte[0]));
-        row.put("cf3", "cq2", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        batchWriter.flush();
-        batchWriter.close();
-
-        String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a&mock=true";
-        AccumuloStorage storage = createAccumuloStorage(location);
-        int count = 0;
-        while (true) {
-            Tuple next = storage.getNext();
-            if (next == null)
-                break;
-            assertEquals(6, next.size());
-            count++;
-        }
-        assertEquals(4, count);
-    }
-
-    public void testAuths() throws Exception {
-        BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
-        Mutation row = new Mutation("a");
-        row.put("cf1", "cq1", new ColumnVisibility("A"), new Value(new byte[0]));
-        row.put("cf2", "cq2", new Value(new byte[0]));
-        batchWriter.addMutation(row);
-        batchWriter.flush();
-        batchWriter.close();
-
-        String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&mock=true";
-        AccumuloStorage storage = createAccumuloStorage(location);
-        int count = 0;
-        while (true) {
-            Tuple next = storage.getNext();
-            if (next == null)
-                break;
-            assertEquals(6, next.size());
-            count++;
-        }
-        assertEquals(1, count);
-
-        location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&auths=A&mock=true";
-        storage = createAccumuloStorage(location);
-        count = 0;
-        while (true) {
-            Tuple next = storage.getNext();
-            if (next == null)
-                break;
-            assertEquals(6, next.size());
-            count++;
-        }
-        assertEquals(2, count);
-    }
-
-    protected AccumuloStorage createAccumuloStorage(String location) throws IOException, InterruptedException {
-        List<AccumuloStorage> accumuloStorages = createAccumuloStorages(location);
-        if (accumuloStorages.size() > 0) {
-            return accumuloStorages.get(0);
-        }
-        return null;
-    }
-
-    protected List<AccumuloStorage> createAccumuloStorages(String location) throws IOException, InterruptedException {
-        List<AccumuloStorage> accumuloStorages = new ArrayList<AccumuloStorage>();
-        AccumuloStorage storage = new AccumuloStorage();
-        InputFormat inputFormat = storage.getInputFormat();
-        Job job = new Job(new Configuration());
-        storage.setLocation(location, job);
-        List<InputSplit> splits = inputFormat.getSplits(job);
-        assertNotNull(splits);
-
-        for (InputSplit inputSplit : splits) {
-            storage = new AccumuloStorage();
-            job = new Job(new Configuration());
-            storage.setLocation(location, job);
-            TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(),
-                    new TaskAttemptID("jtid", 0, false, 0, 0));
-            RecordReader recordReader = inputFormat.createRecordReader(inputSplit,
-                    taskAttemptContext);
-            recordReader.initialize(inputSplit, taskAttemptContext);
-
-            storage.prepareToRead(recordReader, null);
-            accumuloStorages.add(storage);
-        }
-        return accumuloStorages;
-    }
-}