You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rya.apache.org by mi...@apache.org on 2015/12/07 13:04:35 UTC
[05/51] [partial] incubator-rya git commit: Cannot delete temp branch,
doc'd it.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java
deleted file mode 100644
index 054146d..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java
+++ /dev/null
@@ -1,383 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.OrderedLoadFunc;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- * <p/>
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long.
- * <p/>
- * Tuples can be written in 2 forms:
- * (key, colfam, colqual, colvis, value)
- * OR
- * (key, colfam, colqual, value)
- */
-public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, OrderedLoadFunc {
- private static final Log logger = LogFactory.getLog(AccumuloStorage.class);
-
- protected Configuration conf;
- protected RecordReader<Key, Value> reader;
- protected RecordWriter<Text, Mutation> writer;
-
- protected String inst;
- protected String zookeepers;
- protected String user = "";
- protected String password = "";
- protected String table;
- protected Text tableName;
- protected String auths;
- protected Authorizations authorizations = Constants.NO_AUTHS;
- protected List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text, Text>>();
-
- protected Collection<Range> ranges = new ArrayList<Range>();
- protected boolean mock = false;
-
- public AccumuloStorage() {
- }
-
- @Override
- public Tuple getNext() throws IOException {
- try {
- // load the next pair
- if (!reader.nextKeyValue()) {
- logger.info("Reached end of results");
- return null;
- }
-
- Key key = (Key) reader.getCurrentKey();
- Value value = (Value) reader.getCurrentValue();
- assert key != null && value != null;
-
- if (logger.isTraceEnabled()) {
- logger.trace("Found key[" + key + "] and value[" + value + "]");
- }
-
- // and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(6);
- tuple.set(0, new DataByteArray(key.getRow().getBytes()));
- tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
- tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
- tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
- tuple.set(4, key.getTimestamp());
- tuple.set(5, new DataByteArray(value.get()));
- if (logger.isTraceEnabled()) {
- logger.trace("Output tuple[" + tuple + "]");
- }
- return tuple;
- } catch (InterruptedException e) {
- throw new IOException(e.getMessage());
- }
- }
-
- @Override
- public InputFormat getInputFormat() {
- return new AccumuloInputFormat();
- }
-
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split) {
- this.reader = reader;
- }
-
- @Override
- public void setLocation(String location, Job job) throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("Set Location[" + location + "] for job[" + job.getJobName() + "]");
- }
- conf = job.getConfiguration();
- setLocationFromUri(location, job);
-
- if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) {
- try {
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
- } catch (AccumuloSecurityException e) {
- throw new RuntimeException(e);
- }
- AccumuloInputFormat.setInputTableName(job, table);
- AccumuloInputFormat.setScanAuthorizations(job, authorizations);
- if (!mock) {
- AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
- } else {
- AccumuloInputFormat.setMockInstance(job, inst);
- }
- }
- if (columnFamilyColumnQualifierPairs.size() > 0)
- AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
- logger.info("Set ranges[" + ranges + "] for job[" + job.getJobName() + "] on table[" + table + "] " +
- "for columns[" + columnFamilyColumnQualifierPairs + "] with authorizations[" + authorizations + "]");
-
- if (ranges.size() == 0) {
- throw new IOException("Accumulo Range must be specified");
- }
- AccumuloInputFormat.setRanges(job, ranges);
- }
-
- protected void setLocationFromUri(String uri, Job job) throws IOException {
- // ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&range=a|z&range=1|9&mock=true
- try {
- if (!uri.startsWith("accumulo://"))
- throw new Exception("Bad scheme.");
- String[] urlParts = uri.split("\\?");
- setLocationFromUriParts(urlParts);
-
- } catch (Exception e) {
- throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[range=startRow|endRow[...],columns=[cf1|cq1,cf2|cq2,...]],mock=true(false)]': " + e.getMessage(), e);
- }
- }
-
- protected void setLocationFromUriParts(String[] urlParts) {
- String columns = "";
- if (urlParts.length > 1) {
- for (String param : urlParts[1].split("&")) {
- String[] pair = param.split("=");
- if (pair[0].equals("instance")) {
- inst = pair[1];
- } else if (pair[0].equals("user")) {
- user = pair[1];
- } else if (pair[0].equals("password")) {
- password = pair[1];
- } else if (pair[0].equals("zookeepers")) {
- zookeepers = pair[1];
- } else if (pair[0].equals("auths")) {
- auths = pair[1];
- } else if (pair[0].equals("columns")) {
- columns = pair[1];
- } else if (pair[0].equals("range")) {
- String[] r = pair[1].split("\\|");
- if (r.length == 2) {
- addRange(new Range(r[0], r[1]));
- } else {
- addRange(new Range(r[0]));
- }
- } else if (pair[0].equals("mock")) {
- this.mock = Boolean.parseBoolean(pair[1]);
- }
- addLocationFromUriPart(pair);
- }
- }
- String[] parts = urlParts[0].split("/+");
- table = parts[1];
- tableName = new Text(table);
-
- if (auths == null || auths.equals("")) {
- authorizations = new Authorizations();
- } else {
- authorizations = new Authorizations(auths.split(","));
- }
-
- if (!columns.equals("")) {
- for (String cfCq : columns.split(",")) {
- if (cfCq.contains("|")) {
- String[] c = cfCq.split("\\|");
- String cf = c[0];
- String cq = c[1];
- addColumnPair(cf, cq);
- } else {
- addColumnPair(cfCq, null);
- }
- }
- }
- }
-
- protected void addColumnPair(String cf, String cq) {
- columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>((cf != null) ? new Text(cf) : null, (cq != null) ? new Text(cq) : null));
- }
-
- protected void addLocationFromUriPart(String[] pair) {
-
- }
-
- protected void addRange(Range range) {
- ranges.add(range);
- }
-
- @Override
- public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
- return location;
- }
-
- @Override
- public void setUDFContextSignature(String signature) {
-
- }
-
- /* StoreFunc methods */
- public void setStoreFuncUDFContextSignature(String signature) {
-
- }
-
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
- return relativeToAbsolutePath(location, curDir);
- }
-
- public void setStoreLocation(String location, Job job) throws IOException {
- conf = job.getConfiguration();
- setLocationFromUri(location, job);
-
- if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) {
- try {
- AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
- } catch (AccumuloSecurityException e) {
- new RuntimeException(e);
- }
- AccumuloOutputFormat.setDefaultTableName(job, table);
- AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
- BatchWriterConfig config = new BatchWriterConfig();
- config.setMaxLatency(10, TimeUnit.SECONDS);
- config.setMaxMemory(10 * 1000 * 1000);
- config.setMaxWriteThreads(10);
- AccumuloOutputFormat.setBatchWriterOptions(job, config);
- }
- }
-
- public OutputFormat getOutputFormat() {
- return new AccumuloOutputFormat();
- }
-
- public void checkSchema(ResourceSchema schema) throws IOException {
- // we don't care about types, they all get casted to ByteBuffers
- }
-
- public void prepareToWrite(RecordWriter writer) {
- this.writer = writer;
- }
-
- public void putNext(Tuple t) throws ExecException, IOException {
- Mutation mut = new Mutation(objToText(t.get(0)));
- Text cf = objToText(t.get(1));
- Text cq = objToText(t.get(2));
-
- if (t.size() > 4) {
- Text cv = objToText(t.get(3));
- Value val = new Value(objToBytes(t.get(4)));
- if (cv.getLength() == 0) {
- mut.put(cf, cq, val);
- } else {
- mut.put(cf, cq, new ColumnVisibility(cv), val);
- }
- } else {
- Value val = new Value(objToBytes(t.get(3)));
- mut.put(cf, cq, val);
- }
-
- try {
- writer.write(tableName, mut);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- private static Text objToText(Object o) {
- return new Text(objToBytes(o));
- }
-
- private static byte[] objToBytes(Object o) {
- if (o instanceof String) {
- String str = (String) o;
- return str.getBytes();
- } else if (o instanceof Long) {
- Long l = (Long) o;
- return l.toString().getBytes();
- } else if (o instanceof Integer) {
- Integer l = (Integer) o;
- return l.toString().getBytes();
- } else if (o instanceof Boolean) {
- Boolean l = (Boolean) o;
- return l.toString().getBytes();
- } else if (o instanceof Float) {
- Float l = (Float) o;
- return l.toString().getBytes();
- } else if (o instanceof Double) {
- Double l = (Double) o;
- return l.toString().getBytes();
- }
-
- // TODO: handle DataBag, Map<Object, Object>, and Tuple
-
- return ((DataByteArray) o).get();
- }
-
- public void cleanupOnFailure(String failure, Job job) {
- }
-
- @Override
- public WritableComparable<?> getSplitComparable(InputSplit inputSplit) throws IOException {
- //cannot get access to the range directly
- AccumuloInputFormat.RangeInputSplit rangeInputSplit = (AccumuloInputFormat.RangeInputSplit) inputSplit;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos);
- rangeInputSplit.write(out);
- out.close();
- DataInputStream stream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
- Range range = new Range();
- range.readFields(stream);
- stream.close();
- return range;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java
deleted file mode 100644
index 392c108..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java
+++ /dev/null
@@ -1,348 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.regex.Pattern;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.Projection;
-import org.openrdf.query.algebra.ProjectionElem;
-import org.openrdf.query.algebra.ProjectionElemList;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class IndexWritingTool extends Configured implements Tool {
-
- private static final String sparql_key = "SPARQL.VALUE";
- private static String cardCounter = "count";
-
-
- public static void main(String[] args) throws Exception {
-
- ToolRunner.run(new Configuration(), new IndexWritingTool(), args);
-
- }
-
- @Override
- public int run(final String[] args) throws Exception {
- Preconditions.checkArgument(args.length == 7, "java " + IndexWritingTool.class.getCanonicalName()
- + " hdfsSaveLocation sparqlFile cbinstance cbzk cbuser cbpassword rdfTablePrefix.");
-
- final String inputDir = args[0];
- final String sparqlFile = args[1];
- final String instStr = args[2];
- final String zooStr = args[3];
- final String userStr = args[4];
- final String passStr = args[5];
- final String tablePrefix = args[6];
-
- String sparql = FileUtils.readFileToString(new File(sparqlFile));
-
- Job job = new Job(getConf(), "Write HDFS Index to Accumulo");
- job.setJarByClass(this.getClass());
-
- Configuration jobConf = job.getConfiguration();
- jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
- setVarOrders(sparql, jobConf);
-
- TextInputFormat.setInputPaths(job, inputDir);
- job.setInputFormatClass(TextInputFormat.class);
-
- job.setMapperClass(MyMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Mutation.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Mutation.class);
-
- job.setNumReduceTasks(0);
-
- String tableName;
- if (zooStr.equals("mock")) {
- tableName = tablePrefix;
- } else {
- tableName = tablePrefix + "INDEX_" + UUID.randomUUID().toString().replace("-", "").toUpperCase();
- }
- setAccumuloOutput(instStr, zooStr, userStr, passStr, job, tableName);
-
- jobConf.set(sparql_key, sparql);
-
- int complete = job.waitForCompletion(true) ? 0 : -1;
-
- if (complete == 0) {
-
- String[] varOrders = jobConf.getStrings("varOrders");
- String orders = Joiner.on("\u0000").join(varOrders);
- Instance inst;
-
- if (zooStr.equals("mock")) {
- inst = new MockInstance(instStr);
- } else {
- inst = new ZooKeeperInstance(instStr, zooStr);
- }
-
- Connector conn = inst.getConnector(userStr, passStr.getBytes());
- BatchWriter bw = conn.createBatchWriter(tableName, 10, 5000, 1);
-
- Counters counters = job.getCounters();
- Counter c1 = counters.findCounter(cardCounter, cardCounter);
-
- Mutation m = new Mutation("~SPARQL");
- Value v = new Value(sparql.getBytes());
- m.put(new Text("" + c1.getValue()), new Text(orders), v);
- bw.addMutation(m);
-
- bw.close();
-
- return complete;
- } else {
- return complete;
- }
-
-
- }
-
-
- public void setVarOrders(String s, Configuration conf) throws MalformedQueryException {
-
- SPARQLParser parser = new SPARQLParser();
- TupleExpr query = parser.parseQuery(s, null).getTupleExpr();
-
- List<String> projList = Lists.newArrayList(((Projection) query).getProjectionElemList().getTargetNames());
- String projElems = Joiner.on(";").join(projList);
- conf.set("projElems", projElems);
-
- Pattern splitPattern1 = Pattern.compile("\n");
- Pattern splitPattern2 = Pattern.compile(",");
- String[] lines = splitPattern1.split(s);
-
- List<String> varOrders = Lists.newArrayList();
- List<String> varOrderPos = Lists.newArrayList();
-
- int orderNum = 0;
- int projSizeSq = projList.size()*projList.size();
-
- for (String t : lines) {
-
-
- if(orderNum > projSizeSq){
- break;
- }
-
- String[] order = null;
- if (t.startsWith("#prefix")) {
- t = t.substring(7).trim();
- order = splitPattern2.split(t, projList.size());
- }
-
-
- String tempVarOrder = "";
- String tempVarOrderPos = "";
-
- if (order != null) {
- for (String u : order) {
- if (tempVarOrder.length() == 0) {
- tempVarOrder = u.trim();
- } else {
- tempVarOrder = tempVarOrder + ";" + u.trim();
- }
- int pos = projList.indexOf(u.trim());
- if (pos < 0) {
- throw new IllegalArgumentException("Invalid variable order!");
- } else {
- if (tempVarOrderPos.length() == 0) {
- tempVarOrderPos = tempVarOrderPos + pos;
- } else {
- tempVarOrderPos = tempVarOrderPos + ";" + pos;
- }
- }
- }
-
- varOrders.add(tempVarOrder);
- varOrderPos.add(tempVarOrderPos);
- }
-
- if(tempVarOrder.length() > 0) {
- orderNum++;
- }
-
- }
-
- if(orderNum == 0) {
- varOrders.add(projElems);
- String tempVarPos = "";
-
- for(int i = 0; i < projList.size(); i++) {
- if(i == 0) {
- tempVarPos = Integer.toString(0);
- } else {
- tempVarPos = tempVarPos + ";" + i;
- }
- }
- varOrderPos.add(tempVarPos);
-
- }
-
- String[] vOrders = varOrders.toArray(new String[varOrders.size()]);
- String[] vOrderPos = varOrderPos.toArray(new String[varOrderPos.size()]);
-
-
-
- conf.setStrings("varOrders", vOrders);
- conf.setStrings("varOrderPos", vOrderPos);
-
- }
-
-
- private static void setAccumuloOutput(String instStr, String zooStr, String userStr, String passStr, Job job, String tableName)
- throws AccumuloSecurityException {
-
- AuthenticationToken token = new PasswordToken(passStr);
- AccumuloOutputFormat.setConnectorInfo(job, userStr, token);
- AccumuloOutputFormat.setDefaultTableName(job, tableName);
- AccumuloOutputFormat.setCreateTables(job, true);
- //TODO best way to do this?
-
- if (zooStr.equals("mock")) {
- AccumuloOutputFormat.setMockInstance(job, instStr);
- } else {
- AccumuloOutputFormat.setZooKeeperInstance(job, instStr, zooStr);
- }
-
- job.setOutputFormatClass(AccumuloOutputFormat.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Mutation.class);
- }
-
- public static class MyMapper extends Mapper<LongWritable, Text, Text, Mutation> {
-
- private static final Logger logger = Logger.getLogger(MyMapper.class);
- final static Text EMPTY_TEXT = new Text();
- final static Value EMPTY_VALUE = new Value(new byte[] {});
- private String[] varOrderPos = null;
- private String[] projElem = null;
- private Pattern splitPattern = null;
- private List<List<Integer>> varPositions = Lists.newArrayList();
-
-
-
- @Override
- protected void setup(Mapper<LongWritable, Text, Text, Mutation>.Context context) throws IOException,
- InterruptedException {
-
- Configuration conf = context.getConfiguration();
-
- varOrderPos = conf.getStrings("varOrderPos");
- splitPattern = Pattern.compile("\t");
-
- for (String s : varOrderPos) {
- String[] pos = s.split(";");
- List<Integer> intPos = Lists.newArrayList();
- int i = 0;
- for(String t: pos) {
- i = Integer.parseInt(t);
- intPos.add(i);
- }
-
- varPositions.add(intPos);
-
- }
-
- projElem = conf.get("projElems").split(";");
-
- super.setup(context);
- }
-
-
-
-
-
-
- @Override
- public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException {
-
- String[] result = splitPattern.split(value.toString());
-
-
- for (List<Integer> list : varPositions) {
-
- String values = "";
- String vars = "";
-
- for (Integer i : list) {
-
- if (values.length() == 0) {
- values = result[i];
- vars = projElem[i];
- } else {
- values = values + "\u0000" + result[i];
- vars = vars + "\u0000" + projElem[i];
- }
-
- }
- Mutation m = new Mutation(new Text(values));
- m.put(new Text(vars), EMPTY_TEXT, EMPTY_VALUE);
- output.write(EMPTY_TEXT, m);
-
- }
- output.getCounter(cardCounter, cardCounter).increment(1);
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java
deleted file mode 100644
index ed8134d..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java
+++ /dev/null
@@ -1,268 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfEvalStatsDAO;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.accumulo.pig.optimizer.SimilarVarJoinOptimizer;
-import mvm.rya.rdftriplestore.evaluation.QueryJoinOptimizer;
-import mvm.rya.rdftriplestore.evaluation.RdfCloudTripleStoreEvaluationStatistics;
-import mvm.rya.rdftriplestore.inference.InferenceEngine;
-import mvm.rya.rdftriplestore.inference.InverseOfVisitor;
-import mvm.rya.rdftriplestore.inference.SymmetricPropertyVisitor;
-import mvm.rya.rdftriplestore.inference.TransitivePropertyVisitor;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.openrdf.query.algebra.QueryRoot;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.QueryParser;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import java.io.ByteArrayInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-
-/**
- * Created by IntelliJ IDEA.
- * Date: 4/23/12
- * Time: 9:31 AM
- * To change this template use File | Settings | File Templates.
- */
-public class SparqlQueryPigEngine {
- private static final Log logger = LogFactory.getLog(SparqlQueryPigEngine.class);
-
- private String hadoopDir;
- private ExecType execType = ExecType.MAPREDUCE; //default to mapreduce
- private boolean inference = true;
- private boolean stats = true;
- private SparqlToPigTransformVisitor sparqlToPigTransformVisitor;
- private PigServer pigServer;
- private InferenceEngine inferenceEngine = null;
- private RdfCloudTripleStoreEvaluationStatistics rdfCloudTripleStoreEvaluationStatistics;
- private AccumuloRyaDAO ryaDAO;
- AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-
- private AccumuloRdfEvalStatsDAO rdfEvalStatsDAO;
-
- public AccumuloRdfConfiguration getConf() {
- return conf;
- }
-
- public void setConf(AccumuloRdfConfiguration conf) {
- this.conf = conf;
- }
-
- public void init() throws Exception {
- Preconditions.checkNotNull(sparqlToPigTransformVisitor, "Sparql To Pig Transform Visitor must not be null");
- logger.info("Initializing Sparql Query Pig Engine");
- if (hadoopDir != null) {
- //set hadoop dir property
- System.setProperty("HADOOPDIR", hadoopDir);
- }
- //TODO: Maybe have validation of the HadoopDir system property
-
- if (pigServer == null) {
- pigServer = new PigServer(execType);
- }
-
- if (inference || stats) {
- String instance = sparqlToPigTransformVisitor.getInstance();
- String zoo = sparqlToPigTransformVisitor.getZk();
- String user = sparqlToPigTransformVisitor.getUser();
- String pass = sparqlToPigTransformVisitor.getPassword();
-
- Connector connector = new ZooKeeperInstance(instance, zoo).getConnector(user, pass.getBytes());
-
- String tablePrefix = sparqlToPigTransformVisitor.getTablePrefix();
- conf.setTablePrefix(tablePrefix);
- if (inference) {
- logger.info("Using inference");
- inferenceEngine = new InferenceEngine();
- ryaDAO = new AccumuloRyaDAO();
- ryaDAO.setConf(conf);
- ryaDAO.setConnector(connector);
- ryaDAO.init();
-
- inferenceEngine.setRyaDAO(ryaDAO);
- inferenceEngine.setConf(conf);
- inferenceEngine.setSchedule(false);
- inferenceEngine.init();
- }
- if (stats) {
- logger.info("Using stats");
- rdfEvalStatsDAO = new AccumuloRdfEvalStatsDAO();
- rdfEvalStatsDAO.setConf(conf);
- rdfEvalStatsDAO.setConnector(connector);
-// rdfEvalStatsDAO.setEvalTable(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
- rdfEvalStatsDAO.init();
- rdfCloudTripleStoreEvaluationStatistics = new RdfCloudTripleStoreEvaluationStatistics(conf, rdfEvalStatsDAO);
- }
- }
- }
-
- public void destroy() throws Exception {
- logger.info("Shutting down Sparql Query Pig Engine");
- pigServer.shutdown();
- if (ryaDAO != null) {
- ryaDAO.destroy();
- }
- if (inferenceEngine != null) {
- inferenceEngine.destroy();
- }
- if (rdfEvalStatsDAO != null) {
- rdfEvalStatsDAO.destroy();
- }
- }
-
- /**
- * Transform a sparql query into a pig script and execute it. Save results in hdfsSaveLocation
- *
- * @param sparql to execute
- * @param hdfsSaveLocation to save the execution
- * @throws java.io.IOException
- */
- public void runQuery(String sparql, String hdfsSaveLocation) throws IOException {
- Preconditions.checkNotNull(sparql, "Sparql query cannot be null");
- Preconditions.checkNotNull(hdfsSaveLocation, "Hdfs save location cannot be null");
- logger.info("Running query[" + sparql + "]\n to Location[" + hdfsSaveLocation + "]");
- pigServer.deleteFile(hdfsSaveLocation);
- try {
- String pigScript = generatePigScript(sparql);
- if (logger.isDebugEnabled()) {
- logger.debug("Pig script [" + pigScript + "]");
- }
- pigServer.registerScript(new ByteArrayInputStream(pigScript.getBytes()));
- pigServer.store("PROJ", hdfsSaveLocation); //TODO: Make this a constant
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- public String generatePigScript(String sparql) throws Exception {
- Preconditions.checkNotNull(sparql, "Sparql query cannot be null");
- QueryParser parser = new SPARQLParser();
- ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
- QueryRoot tupleExpr = new QueryRoot(parsedQuery.getTupleExpr());
-
-// SimilarVarJoinOptimizer similarVarJoinOptimizer = new SimilarVarJoinOptimizer();
-// similarVarJoinOptimizer.optimize(tupleExpr, null, null);
-
- if (inference || stats) {
- if (inference) {
- tupleExpr.visit(new TransitivePropertyVisitor(conf, inferenceEngine));
- tupleExpr.visit(new SymmetricPropertyVisitor(conf, inferenceEngine));
- tupleExpr.visit(new InverseOfVisitor(conf, inferenceEngine));
- }
- if (stats) {
- (new QueryJoinOptimizer(rdfCloudTripleStoreEvaluationStatistics)).optimize(tupleExpr, null, null);
- }
- }
-
- sparqlToPigTransformVisitor.meet(tupleExpr);
- return sparqlToPigTransformVisitor.getPigScript();
- }
-
-
- public static void main(String[] args) {
- try {
- Preconditions.checkArgument(args.length == 7, "Usage: java -cp <jar>:$PIG_LIB <class> sparqlFile hdfsSaveLocation cbinstance cbzk cbuser cbpassword rdfTablePrefix.\n " +
- "Sample command: java -cp java -cp cloudbase.pig-2.0.0-SNAPSHOT-shaded.jar:/usr/local/hadoop-etc/hadoop-0.20.2/hadoop-0.20.2-core.jar:/srv_old/hdfs-tmp/pig/pig-0.9.2/pig-0.9.2.jar:$HADOOP_HOME/conf mvm.rya.accumulo.pig.SparqlQueryPigEngine " +
- "tstSpqrl.query temp/engineTest stratus stratus13:2181 root password l_");
- String sparql = new String(ByteStreams.toByteArray(new FileInputStream(args[0])));
- String hdfsSaveLocation = args[1];
- SparqlToPigTransformVisitor visitor = new SparqlToPigTransformVisitor();
- visitor.setTablePrefix(args[6]);
- visitor.setInstance(args[2]);
- visitor.setZk(args[3]);
- visitor.setUser(args[4]);
- visitor.setPassword(args[5]);
-
- SparqlQueryPigEngine engine = new SparqlQueryPigEngine();
- engine.setSparqlToPigTransformVisitor(visitor);
- engine.setInference(false);
- engine.setStats(false);
-
- engine.init();
-
- engine.runQuery(sparql, hdfsSaveLocation);
-
- engine.destroy();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public String getHadoopDir() {
- return hadoopDir;
- }
-
- public void setHadoopDir(String hadoopDir) {
- this.hadoopDir = hadoopDir;
- }
-
- public PigServer getPigServer() {
- return pigServer;
- }
-
- public void setPigServer(PigServer pigServer) {
- this.pigServer = pigServer;
- }
-
- public ExecType getExecType() {
- return execType;
- }
-
- public void setExecType(ExecType execType) {
- this.execType = execType;
- }
-
- public boolean isInference() {
- return inference;
- }
-
- public void setInference(boolean inference) {
- this.inference = inference;
- }
-
- public boolean isStats() {
- return stats;
- }
-
- public void setStats(boolean stats) {
- this.stats = stats;
- }
-
- public SparqlToPigTransformVisitor getSparqlToPigTransformVisitor() {
- return sparqlToPigTransformVisitor;
- }
-
- public void setSparqlToPigTransformVisitor(SparqlToPigTransformVisitor sparqlToPigTransformVisitor) {
- this.sparqlToPigTransformVisitor = sparqlToPigTransformVisitor;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java
deleted file mode 100644
index 38d8adb..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java
+++ /dev/null
@@ -1,345 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import org.openrdf.model.Literal;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.query.algebra.*;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-
-import java.util.*;
-
-/**
- * Created by IntelliJ IDEA.
- * Date: 4/12/12
- * Time: 10:17 AM
- * To change this template use File | Settings | File Templates.
- */
-public class SparqlToPigTransformVisitor extends QueryModelVisitorBase<RuntimeException> {
- private StringBuilder pigScriptBuilder = new StringBuilder();
- private String tablePrefix;
- private String instance, zk, user, password; //TODO: use a Configuration object to get these
-
- private Map<String, String> varToSet = new HashMap<String, String>();
- private Map<TupleExpr, List<String>> exprToNames = new HashMap<TupleExpr, List<String>>();
- private Map<TupleExpr, String> exprToVar = new HashMap<TupleExpr, String>();
-
- private char i = 'A'; //TODO: do better, hack
-
- public SparqlToPigTransformVisitor() {
- pigScriptBuilder.append("set pig.splitCombination false;\n")
- .append("set default_parallel 32;\n") //TODO: set parallel properly
- .append("set mapred.map.tasks.speculative.execution false;\n")
- .append("set mapred.reduce.tasks.speculative.execution false;\n")
- .append("set io.sort.mb 256;\n")
- .append("set mapred.child.java.opts -Xmx2048m;\n")
- .append("set mapred.compress.map.output true;\n")
- .append("set mapred.map.output.compression.codec org.apache.hadoop.io.compress.GzipCodec;\n")
- .append("set io.file.buffer.size 65536;\n")
- .append("set io.sort.factor 25;\n");
- }
-
- @Override
- public void meet(StatementPattern node) throws RuntimeException {
- super.meet(node);
- String subjValue = getVarValue(node.getSubjectVar());
- String predValue = getVarValue(node.getPredicateVar());
- String objValue = getVarValue(node.getObjectVar());
-
- String subj = i + "_s";
- String pred = i + "_p";
- String obj = i + "_o";
- String var = i + "";
- if (node.getSubjectVar().getValue() == null) { //TODO: look nicer
- subj = node.getSubjectVar().getName();
- varToSet.put(subj, var);
-
- addToExprToNames(node, subj);
- }
- if (node.getPredicateVar().getValue() == null) { //TODO: look nicer
- pred = node.getPredicateVar().getName();
- varToSet.put(pred, var);
-
- addToExprToNames(node, pred);
- }
- if (node.getObjectVar().getValue() == null) { //TODO: look nicer
- obj = node.getObjectVar().getName();
- varToSet.put(obj, var);
-
- addToExprToNames(node, obj);
- }
- if (node.getContextVar() != null && node.getContextVar().getValue() == null) {
- String cntxtName = node.getContextVar().getName();
- varToSet.put(cntxtName, var);
-
- addToExprToNames(node, cntxtName);
- }
- //load 'l_' using mvm.rya.cloudbase.pig.dep.StatementPatternStorage('<http://www.Department0.University0.edu>', '', '',
- // 'stratus', 'stratus13:2181', 'root', 'password') AS (dept:chararray, p:chararray, univ:chararray);
-// pigScriptBuilder.append(i).append(" = load '").append(tablePrefix).append("' using mvm.rya.cloudbase.pig.dep.StatementPatternStorage('")
-// .append(subjValue).append("','").append(predValue).append("','").append(objValue).append("','").append(instance).append("','")
-// .append(zk).append("','").append(user).append("','").append(password).append("') AS (").append(subj).append(":chararray, ")
-// .append(pred).append(":chararray, ").append(obj).append(":chararray);\n");
-
- //load 'cloudbase://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c'
- //using mvm.rya.accumulo.pig.StatementPatternStorage() AS (dept:chararray, p:chararray, univ:chararray);
- pigScriptBuilder.append(i).append(" = load 'accumulo://").append(tablePrefix).append("?instance=").append(instance).append("&user=").append(user)
- .append("&password=").append(password).append("&zookeepers=").append(zk);
- if (subjValue != null && subjValue.length() > 0) {
- pigScriptBuilder.append("&subject=").append(subjValue);
- }
- if (predValue != null && predValue.length() > 0) {
- pigScriptBuilder.append("&predicate=").append(predValue);
- }
- if (objValue != null && objValue.length() > 0) {
- pigScriptBuilder.append("&object=").append(objValue);
- }
- if (node.getContextVar() != null && node.getContextVar().getValue() != null) {
- pigScriptBuilder.append("&context=").append(getVarValue(node.getContextVar()));
- }
-
- pigScriptBuilder.append("' using ").append(StatementPatternStorage.class.getName()).append("() AS (").append(subj).append(":chararray, ")
- .append(pred).append(":chararray, ").append(obj).append(":chararray");
- if (node.getContextVar() != null) {
- Value cntxtValue = node.getContextVar().getValue();
- String cntxtName = null;
- if (cntxtValue == null) {
- //use name
- cntxtName = node.getContextVar().getName();
- } else {
- cntxtName = i + "_c";
- }
- pigScriptBuilder.append(", ").append(cntxtName).append(":chararray");
- }
- pigScriptBuilder.append(");\n");
- //TODO: add auths
-
- exprToVar.put(node, var);
- i++;
- }
-
- private void addToExprToNames(TupleExpr node, String name) {
- List<String> names = exprToNames.get(node);
- if (names == null) {
- names = new ArrayList<String>();
- exprToNames.put(node, names);
- }
- names.add(name);
- }
-
- @Override
- public void meet(Union node) throws RuntimeException {
- super.meet(node);
-
- TupleExpr leftArg = node.getLeftArg();
- TupleExpr rightArg = node.getRightArg();
- String left_var = exprToVar.get(leftArg);
- String right_var = exprToVar.get(rightArg);
- //Q = UNION ONSCHEMA B, P;
- pigScriptBuilder.append(i).append(" = UNION ONSCHEMA ").append(left_var).append(", ").append(right_var).append(";\n");
-
- String unionVar = i + "";
- List<String> left_names = exprToNames.get(leftArg);
- List<String> right_names = exprToNames.get(rightArg);
- for (String name : left_names) {
- varToSet.put(name, unionVar);
- addToExprToNames(node, name);
- }
- for (String name : right_names) {
- varToSet.put(name, unionVar);
- addToExprToNames(node, name);
- }
- exprToVar.put(node, unionVar);
- i++;
- }
-
- @Override
- public void meet(Join node) throws RuntimeException {
- super.meet(node);
-
- TupleExpr leftArg = node.getLeftArg();
- TupleExpr rightArg = node.getRightArg();
- List<String> left_names = exprToNames.get(leftArg);
- List<String> right_names = exprToNames.get(rightArg);
-
- Set<String> joinNames = new HashSet<String>(left_names);
- joinNames.retainAll(right_names); //intersection, this is what I join on
- //SEC = join FIR by (MEMB_OF::ugrad, SUBORG_J::univ), UGRADDEG by (ugrad, univ);
- StringBuilder joinStr = new StringBuilder();
- joinStr.append("(");
- boolean first = true;
- for (String name : joinNames) { //TODO: Make this a utility method
- if (!first) {
- joinStr.append(",");
- }
- first = false;
- joinStr.append(name);
- }
- joinStr.append(")");
-
- String left_var = exprToVar.get(leftArg);
- String right_var = exprToVar.get(rightArg);
- if (joinStr.length() <= 2) {
- //no join params, need to cross
- pigScriptBuilder.append(i).append(" = cross ").append(left_var).append(", ").append(right_var).append(";\n");
- } else {
- //join
- pigScriptBuilder.append(i).append(" = join ").append(left_var);
- pigScriptBuilder.append(" by ").append(joinStr);
- pigScriptBuilder.append(", ").append(right_var);
- pigScriptBuilder.append(" by ").append(joinStr);
- pigScriptBuilder.append(";\n");
-
- }
-
- String joinVarStr = i + "";
- i++;
- // D = foreach C GENERATE A::subj AS subj:chararray, A::A_p AS p:chararray;
- String forEachVarStr = i + "";
- pigScriptBuilder.append(i).append(" = foreach ").append(joinVarStr).append(" GENERATE ");
- Map<String, String> nameToJoinName = new HashMap<String, String>();
- for (String name : left_names) {
- varToSet.put(name, forEachVarStr);
- addToExprToNames(node, name);
- nameToJoinName.put(name, left_var + "::" + name);
- }
- for (String name : right_names) {
- varToSet.put(name, forEachVarStr);
- addToExprToNames(node, name);
- nameToJoinName.put(name, right_var + "::" + name);
- }
-
- first = true;
- for (Map.Entry entry : nameToJoinName.entrySet()) {
- if (!first) {
- pigScriptBuilder.append(",");
- }
- first = false;
- pigScriptBuilder.append(entry.getValue()).append(" AS ").append(entry.getKey()).append(":chararray ");
- }
- pigScriptBuilder.append(";\n");
-
- exprToVar.put(node, forEachVarStr);
- i++;
- }
-
- @Override
- public void meet(Projection node) throws RuntimeException {
- super.meet(node);
- ProjectionElemList list = node.getProjectionElemList();
- String set = null;
- StringBuilder projList = new StringBuilder();
- boolean first = true;
- //TODO: we do not support projections from multiple pig statements yet
- for (String name : list.getTargetNames()) {
- set = varToSet.get(name); //TODO: overwrite
- if (set == null) {
- throw new IllegalArgumentException("Have not found any pig logic for name[" + name + "]");
- }
- if (!first) {
- projList.append(",");
- }
- first = false;
- projList.append(name);
- }
- if (set == null)
- throw new IllegalArgumentException(""); //TODO: Fill this
- //SUBORG = FOREACH SUBORG_L GENERATE dept, univ;
- pigScriptBuilder.append("PROJ = FOREACH ").append(set).append(" GENERATE ").append(projList.toString()).append(";\n");
- }
-
- @Override
- public void meet(Slice node) throws RuntimeException {
- super.meet(node);
- long limit = node.getLimit();
- //PROJ = LIMIT PROJ 10;
- pigScriptBuilder.append("PROJ = LIMIT PROJ ").append(limit).append(";\n");
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getUser() {
- return user;
- }
-
- public void setUser(String user) {
- this.user = user;
- }
-
- public String getZk() {
- return zk;
- }
-
- public void setZk(String zk) {
- this.zk = zk;
- }
-
- public String getInstance() {
- return instance;
- }
-
- public void setInstance(String instance) {
- this.instance = instance;
- }
-
- public String getTablePrefix() {
- return tablePrefix;
- }
-
- public void setTablePrefix(String tablePrefix) {
- this.tablePrefix = tablePrefix;
- }
-
- public String getPigScript() {
- return pigScriptBuilder.toString();
- }
-
- protected String getVarValue(Var var) {
- if (var == null) {
- return "";
- } else {
- Value value = var.getValue();
- if (value == null) {
- return "";
- }
- if (value instanceof URI) {
- return "<" + value.stringValue() + ">";
- }
- if (value instanceof Literal) {
- Literal lit = (Literal) value;
- if (lit.getDatatype() == null) {
- //string
- return "\\'" + value.stringValue() + "\\'";
- }
- }
- return value.stringValue();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java
deleted file mode 100644
index 9ec9d45..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java
+++ /dev/null
@@ -1,304 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.persist.RyaDAOException;
-import mvm.rya.api.query.strategy.ByteRange;
-import mvm.rya.api.query.strategy.TriplePatternStrategy;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.rdftriplestore.inference.InferenceEngine;
-import mvm.rya.rdftriplestore.inference.InferenceEngineException;
-
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.openrdf.model.Resource;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.model.vocabulary.RDF;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.algebra.Var;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.QueryParser;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteStreams;
-
-/**
- */
-public class StatementPatternStorage extends AccumuloStorage {
- private static final Log logger = LogFactory.getLog(StatementPatternStorage.class);
- protected TABLE_LAYOUT layout;
- protected String subject = "?s";
- protected String predicate = "?p";
- protected String object = "?o";
- protected String context;
- private Value subject_value;
- private Value predicate_value;
- private Value object_value;
-
- private RyaTripleContext ryaContext;
-
- /**
- * whether to turn inferencing on or off
- */
- private boolean infer = false;
-
- public StatementPatternStorage() {
- if (super.conf != null){
- ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(super.conf));
- }
- else {
- ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
- }
-
- }
-
- private Value getValue(Var subjectVar) {
- return subjectVar.hasValue() ? subjectVar.getValue() : null;
- }
-
- @Override
- public void setLocation(String location, Job job) throws IOException {
- super.setLocation(location, job);
- }
-
- @Override
- protected void setLocationFromUri(String uri, Job job) throws IOException {
- super.setLocationFromUri(uri, job);
- // ex: accumulo://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c&context=c&infer=true
- addStatementPatternRange(subject, predicate, object, context);
- if (infer) {
- addInferredRanges(table, job);
- }
-
- if (layout == null || ranges.size() == 0)
- throw new IllegalArgumentException("Range and/or layout is null. Check the query");
- table = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, table);
- tableName = new Text(table);
- }
-
- @Override
- protected void addLocationFromUriPart(String[] pair) {
- if (pair[0].equals("subject")) {
- this.subject = pair[1];
- } else if (pair[0].equals("predicate")) {
- this.predicate = pair[1];
- } else if (pair[0].equals("object")) {
- this.object = pair[1];
- } else if (pair[0].equals("context")) {
- this.context = pair[1];
- } else if (pair[0].equals("infer")) {
- this.infer = Boolean.parseBoolean(pair[1]);
- }
- }
-
- protected void addStatementPatternRange(String subj, String pred, String obj, String ctxt) throws IOException {
- logger.info("Adding statement pattern[subject:" + subj + ", predicate:" + pred + ", object:" + obj + ", context:" + ctxt + "]");
- StringBuilder sparqlBuilder = new StringBuilder();
- sparqlBuilder.append("select * where {\n");
- if (ctxt != null) {
- /**
- * select * where {
- GRAPH ?g {
- <http://www.example.org/exampleDocument#Monica> ?p ?o.
- }
- }
- */
- sparqlBuilder.append("GRAPH ").append(ctxt).append(" {\n");
- }
- sparqlBuilder.append(subj).append(" ").append(pred).append(" ").append(obj).append(".\n");
- if (ctxt != null) {
- sparqlBuilder.append("}\n");
- }
- sparqlBuilder.append("}\n");
- String sparql = sparqlBuilder.toString();
-
- if (logger.isDebugEnabled()) {
- logger.debug("Sparql statement range[" + sparql + "]");
- }
-
- QueryParser parser = new SPARQLParser();
- ParsedQuery parsedQuery = null;
- try {
- parsedQuery = parser.parseQuery(sparql, null);
- } catch (MalformedQueryException e) {
- throw new IOException(e);
- }
- parsedQuery.getTupleExpr().visitChildren(new QueryModelVisitorBase<IOException>() {
- @Override
- public void meet(StatementPattern node) throws IOException {
- Var subjectVar = node.getSubjectVar();
- Var predicateVar = node.getPredicateVar();
- Var objectVar = node.getObjectVar();
- subject_value = getValue(subjectVar);
- predicate_value = getValue(predicateVar);
- object_value = getValue(objectVar);
- Var contextVar = node.getContextVar();
- Map.Entry<TABLE_LAYOUT, Range> temp = createRange(subject_value, predicate_value, object_value);
- layout = temp.getKey();
- Range range = temp.getValue();
- addRange(range);
- if (contextVar != null && contextVar.getValue() != null) {
- String context_str = contextVar.getValue().stringValue();
- addColumnPair(context_str, "");
- }
- }
- });
- }
-
- protected Map.Entry<TABLE_LAYOUT, Range> createRange(Value s_v, Value p_v, Value o_v) throws IOException {
- RyaURI subject_rya = RdfToRyaConversions.convertResource((Resource) s_v);
- RyaURI predicate_rya = RdfToRyaConversions.convertURI((URI) p_v);
- RyaType object_rya = RdfToRyaConversions.convertValue(o_v);
- TriplePatternStrategy strategy = ryaContext.retrieveStrategy(subject_rya, predicate_rya, object_rya, null);
- if (strategy == null)
- return new RdfCloudTripleStoreUtils.CustomEntry<TABLE_LAYOUT, Range>(TABLE_LAYOUT.SPO, new Range());
- Map.Entry<TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(subject_rya, predicate_rya, object_rya, null, null);
- ByteRange byteRange = entry.getValue();
- return new RdfCloudTripleStoreUtils.CustomEntry<mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT, Range>(
- entry.getKey(), new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd()))
- );
- }
-
- protected void addInferredRanges(String tablePrefix, Job job) throws IOException {
- logger.info("Adding inferences to statement pattern[subject:" + subject_value + ", predicate:" + predicate_value + ", object:" + object_value + "]");
- //inference engine
- AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO();
- InferenceEngine inferenceEngine = new InferenceEngine();
- try {
- AccumuloRdfConfiguration rdfConf = new AccumuloRdfConfiguration(job.getConfiguration());
- rdfConf.setTablePrefix(tablePrefix);
- ryaDAO.setConf(rdfConf);
- try {
- if (!mock) {
- ryaDAO.setConnector(new ZooKeeperInstance(inst, zookeepers).getConnector(user, password.getBytes()));
- } else {
- ryaDAO.setConnector(new MockInstance(inst).getConnector(user, password.getBytes()));
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
- ryaDAO.init();
- inferenceEngine.setConf(rdfConf);
- inferenceEngine.setRyaDAO(ryaDAO);
- inferenceEngine.setSchedule(false);
- inferenceEngine.init();
- //is it subclassof or subpropertyof
- if (RDF.TYPE.equals(predicate_value)) {
- //try subclassof
- Collection<URI> parents = inferenceEngine.findParents(inferenceEngine.getSubClassOfGraph(), (URI) object_value);
- if (parents != null && parents.size() > 0) {
- //subclassof relationships found
- //don't add self, that will happen anyway later
- //add all relationships
- for (URI parent : parents) {
- Map.Entry<TABLE_LAYOUT, Range> temp =
- createRange(subject_value, predicate_value, parent);
- Range range = temp.getValue();
- if (logger.isDebugEnabled()) {
- logger.debug("Found subClassOf relationship [type:" + object_value + " is subClassOf:" + parent + "]");
- }
- addRange(range);
- }
- }
- } else if (predicate_value != null) {
- //subpropertyof check
- Set<URI> parents = inferenceEngine.findParents(inferenceEngine.getSubPropertyOfGraph(), (URI) predicate_value);
- for (URI parent : parents) {
- Map.Entry<TABLE_LAYOUT, Range> temp =
- createRange(subject_value, parent, object_value);
- Range range = temp.getValue();
- if (logger.isDebugEnabled()) {
- logger.debug("Found subPropertyOf relationship [type:" + predicate_value + " is subPropertyOf:" + parent + "]");
- }
- addRange(range);
- }
- }
- } catch (Exception e) {
- logger.error("Exception in adding inferred ranges", e);
- throw new IOException(e);
- } finally {
- if (inferenceEngine != null) {
- try {
- inferenceEngine.destroy();
- } catch (InferenceEngineException e) {
- logger.error("Exception closing InferenceEngine", e);
- }
- }
- if (ryaDAO != null) {
- try {
- ryaDAO.destroy();
- } catch (RyaDAOException e) {
- logger.error("Exception closing ryadao", e);
- }
- }
- }
- }
-
- @Override
- public Tuple getNext() throws IOException {
- try {
- if (reader.nextKeyValue()) {
- Key key = (Key) reader.getCurrentKey();
- org.apache.accumulo.core.data.Value value = (org.apache.accumulo.core.data.Value) reader.getCurrentValue();
- ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes());
- RyaStatement ryaStatement = ryaContext.deserializeTriple(layout, new TripleRow(key.getRow().getBytes(),
- key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes()));
-
- Tuple tuple = TupleFactory.getInstance().newTuple(4);
- tuple.set(0, ryaStatement.getSubject().getData());
- tuple.set(1, ryaStatement.getPredicate().getData());
- tuple.set(2, ryaStatement.getObject().getData());
- tuple.set(3, (ryaStatement.getContext() != null) ? (ryaStatement.getContext().getData()) : (null));
- return tuple;
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java
deleted file mode 100644
index 4b458b6..0000000
--- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java
+++ /dev/null
@@ -1,210 +0,0 @@
-package mvm.rya.accumulo.pig.optimizer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.Dataset;
-import org.openrdf.query.algebra.*;
-import org.openrdf.query.algebra.evaluation.QueryOptimizer;
-import org.openrdf.query.algebra.evaluation.impl.EvaluationStatistics;
-import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
-import org.openrdf.query.algebra.helpers.StatementPatternCollector;
-
-import java.util.*;
-
-/**
- * A query optimizer that re-orders nested Joins according to cardinality, preferring joins that have similar variables.
- *
- */
-public class SimilarVarJoinOptimizer implements QueryOptimizer {
-
- protected final EvaluationStatistics statistics;
-
- public SimilarVarJoinOptimizer() {
- this(new EvaluationStatistics());
- }
-
- public SimilarVarJoinOptimizer(EvaluationStatistics statistics) {
- this.statistics = statistics;
- }
-
- /**
- * Applies generally applicable optimizations: path expressions are sorted
- * from more to less specific.
- *
- * @param tupleExpr
- */
- public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) {
- tupleExpr.visit(new JoinVisitor());
- }
-
- protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> {
-
- Set<String> boundVars = new HashSet<String>();
-
- @Override
- public void meet(LeftJoin leftJoin) {
- leftJoin.getLeftArg().visit(this);
-
- Set<String> origBoundVars = boundVars;
- try {
- boundVars = new HashSet<String>(boundVars);
- boundVars.addAll(leftJoin.getLeftArg().getBindingNames());
-
- leftJoin.getRightArg().visit(this);
- } finally {
- boundVars = origBoundVars;
- }
- }
-
- @Override
- public void meet(Join node) {
- Set<String> origBoundVars = boundVars;
- try {
- boundVars = new HashSet<String>(boundVars);
-
- // Recursively get the join arguments
- List<TupleExpr> joinArgs = getJoinArgs(node, new ArrayList<TupleExpr>());
-
- // Build maps of cardinalities and vars per tuple expression
- Map<TupleExpr, Double> cardinalityMap = new HashMap<TupleExpr, Double>();
-
- for (TupleExpr tupleExpr : joinArgs) {
- double cardinality = statistics.getCardinality(tupleExpr);
- cardinalityMap.put(tupleExpr, cardinality);
- }
-
- // Reorder the (recursive) join arguments to a more optimal sequence
- List<TupleExpr> orderedJoinArgs = new ArrayList<TupleExpr>(joinArgs.size());
- TupleExpr last = null;
- while (!joinArgs.isEmpty()) {
- TupleExpr tupleExpr = selectNextTupleExpr(joinArgs, cardinalityMap, last);
- if (tupleExpr == null) {
- break;
- }
-
- joinArgs.remove(tupleExpr);
- orderedJoinArgs.add(tupleExpr);
- last = tupleExpr;
-
- // Recursively optimize join arguments
- tupleExpr.visit(this);
-
- boundVars.addAll(tupleExpr.getBindingNames());
- }
-
- // Build new join hierarchy
- // Note: generated hierarchy is right-recursive to help the
- // IterativeEvaluationOptimizer to factor out the left-most join
- // argument
- int i = 0;
- TupleExpr replacement = orderedJoinArgs.get(i);
- for (i++; i < orderedJoinArgs.size(); i++) {
- replacement = new Join(replacement, orderedJoinArgs.get(i));
- }
-
- // Replace old join hierarchy
- node.replaceWith(replacement);
- } finally {
- boundVars = origBoundVars;
- }
- }
-
- protected <L extends List<TupleExpr>> L getJoinArgs(TupleExpr tupleExpr, L joinArgs) {
- if (tupleExpr instanceof Join) {
- Join join = (Join) tupleExpr;
- getJoinArgs(join.getLeftArg(), joinArgs);
- getJoinArgs(join.getRightArg(), joinArgs);
- } else {
- joinArgs.add(tupleExpr);
- }
-
- return joinArgs;
- }
-
- protected List<Var> getStatementPatternVars(TupleExpr tupleExpr) {
- if(tupleExpr == null)
- return null;
- List<StatementPattern> stPatterns = StatementPatternCollector.process(tupleExpr);
- List<Var> varList = new ArrayList<Var>(stPatterns.size() * 4);
- for (StatementPattern sp : stPatterns) {
- sp.getVars(varList);
- }
- return varList;
- }
-
- protected <M extends Map<Var, Integer>> M getVarFreqMap(List<Var> varList, M varFreqMap) {
- for (Var var : varList) {
- Integer freq = varFreqMap.get(var);
- freq = (freq == null) ? 1 : freq + 1;
- varFreqMap.put(var, freq);
- }
- return varFreqMap;
- }
-
- /**
- * Selects from a list of tuple expressions the next tuple expression that
- * should be evaluated. This method selects the tuple expression with
- * highest number of bound variables, preferring variables that have been
- * bound in other tuple expressions over variables with a fixed value.
- */
- protected TupleExpr selectNextTupleExpr(List<TupleExpr> expressions,
- Map<TupleExpr, Double> cardinalityMap,
- TupleExpr last) {
- double lowestCardinality = Double.MAX_VALUE;
- TupleExpr result = expressions.get(0);
- expressions = getExprsWithSameVars(expressions, last);
-
- for (TupleExpr tupleExpr : expressions) {
- // Calculate a score for this tuple expression
- double cardinality = cardinalityMap.get(tupleExpr);
-
- if (cardinality < lowestCardinality) {
- // More specific path expression found
- lowestCardinality = cardinality;
- result = tupleExpr;
- }
- }
-
- return result;
- }
-
- protected List<TupleExpr> getExprsWithSameVars(List<TupleExpr> expressions, TupleExpr last) {
- if(last == null)
- return expressions;
- List<TupleExpr> retExprs = new ArrayList<TupleExpr>();
- for(TupleExpr tupleExpr : expressions) {
- List<Var> statementPatternVars = getStatementPatternVars(tupleExpr);
- List<Var> lastVars = getStatementPatternVars(last);
- statementPatternVars.retainAll(lastVars);
- if(statementPatternVars.size() > 0) {
- retExprs.add(tupleExpr);
- }
- }
- if(retExprs.size() == 0) {
- return expressions;
- }
- return retExprs;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java
deleted file mode 100644
index 119ccb1..0000000
--- a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java
+++ /dev/null
@@ -1,284 +0,0 @@
-package mvm.rya.accumulo.pig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.admin.SecurityOperations;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.pig.data.Tuple;
-
-/**
- * Created by IntelliJ IDEA.
- * Date: 4/20/12
- * Time: 10:17 AM
- * To change this template use File | Settings | File Templates.
- */
-public class AccumuloStorageTest extends TestCase {
-
- private String user = "user";
- private String pwd = "pwd";
- private String instance = "myinstance";
- private String table = "testTable";
- private Authorizations auths = Constants.NO_AUTHS;
- private Connector connector;
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
- connector = new MockInstance(instance).getConnector(user, new PasswordToken(pwd.getBytes()));
- connector.tableOperations().create(table);
- SecurityOperations secOps = connector.securityOperations();
- secOps.createLocalUser(user, new PasswordToken(pwd.getBytes()));
- secOps.grantTablePermission(user, table, TablePermission.READ);
- secOps.grantTablePermission(user, table, TablePermission.WRITE);
- }
-
- @Override
- public void tearDown() throws Exception {
- super.tearDown();
- connector.tableOperations().delete(table);
- }
-
- public void testSimpleOutput() throws Exception {
- BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
- Mutation row = new Mutation("row");
- row.put("cf", "cq", new Value(new byte[0]));
- batchWriter.addMutation(row);
- batchWriter.flush();
- batchWriter.close();
-
- String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|z&mock=true";
- AccumuloStorage storage = createAccumuloStorage(location);
- int count = 0;
- while (true) {
- Tuple next = storage.getNext();
- if (next == null)
- break;
- assertEquals(6, next.size());
- count++;
- }
- assertEquals(1, count);
- }
-
- public void testRange() throws Exception {
- BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
- Mutation row = new Mutation("a");
- row.put("cf", "cq", new Value(new byte[0]));
- batchWriter.addMutation(row);
- row = new Mutation("b");
- row.put("cf", "cq", new Value(new byte[0]));
- batchWriter.addMutation(row);
- row = new Mutation("d");
- row.put("cf", "cq", new Value(new byte[0]));
- batchWriter.addMutation(row);
- batchWriter.flush();
- batchWriter.close();
-
- String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&mock=true";
- AccumuloStorage storage = createAccumuloStorage(location);
- int count = 0;
- while (true) {
- Tuple next = storage.getNext();
- if (next == null)
- break;
- assertEquals(6, next.size());
- count++;
- }
- assertEquals(2, count);
- }
-
- public void testMultipleRanges() throws Exception {
- BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
- Mutation row = new Mutation("a");
- row.put("cf", "cq", new Value(new byte[0]));
- batchWriter.addMutation(row);
- row = new Mutation("b");
- row.put("cf", "cq", new Value(new byte[0]));
- batchWriter.addMutation(row);
- row = new Mutation("d");
- row.put("cf", "cq", new Value(new byte[0]));
- batchWriter.addMutation(row);
- batchWriter.flush();
- batchWriter.close();
-
- String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&range=d|e&mock=true";
- List<AccumuloStorage> storages = createAccumuloStorages(location);
- assertEquals(2, storages.size());
- AccumuloStorage storage = storages.get(0);
- int count = 0;
- while (true) {
- Tuple next = storage.getNext();
- if (next == null)
- break;
- assertEquals(6, next.size());
- count++;
- }
- assertEquals(2, count);
- storage = storages.get(1);
- count = 0;
- while (true) {
- Tuple next = storage.getNext();
- if (next == null)
- break;
- assertEquals(6, next.size());
- count++;
- }
- assertEquals(1, count);
- }
-
- public void testColumns() throws Exception {
- BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
- Mutation row = new Mutation("a");
- row.put("cf1", "cq", new Value(new byte[0]));
- row.put("cf2", "cq", new Value(new byte[0]));
- row.put("cf3", "cq1", new Value(new byte[0]));
- row.put("cf3", "cq2", new Value(new byte[0]));
- batchWriter.addMutation(row);
- batchWriter.flush();
- batchWriter.close();
-
- String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&columns=cf1,cf3|cq1&mock=true";
- AccumuloStorage storage = createAccumuloStorage(location);
- int count = 0;
- while (true) {
- Tuple next = storage.getNext();
- if (next == null)
- break;
- assertEquals(6, next.size());
- count++;
- }
- assertEquals(2, count);
- }
-
- public void testWholeRowRange() throws Exception {
- BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
- Mutation row = new Mutation("a");
- row.put("cf1", "cq", new Value(new byte[0]));
- row.put("cf2", "cq", new Value(new byte[0]));
- row.put("cf3", "cq1", new Value(new byte[0]));
- row.put("cf3", "cq2", new Value(new byte[0]));
- batchWriter.addMutation(row);
- batchWriter.flush();
- batchWriter.close();
-
- String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a&mock=true";
- AccumuloStorage storage = createAccumuloStorage(location);
- int count = 0;
- while (true) {
- Tuple next = storage.getNext();
- if (next == null)
- break;
- assertEquals(6, next.size());
- count++;
- }
- assertEquals(4, count);
- }
-
- public void testAuths() throws Exception {
- BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2);
- Mutation row = new Mutation("a");
- row.put("cf1", "cq1", new ColumnVisibility("A"), new Value(new byte[0]));
- row.put("cf2", "cq2", new Value(new byte[0]));
- batchWriter.addMutation(row);
- batchWriter.flush();
- batchWriter.close();
-
- String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&mock=true";
- AccumuloStorage storage = createAccumuloStorage(location);
- int count = 0;
- while (true) {
- Tuple next = storage.getNext();
- if (next == null)
- break;
- assertEquals(6, next.size());
- count++;
- }
- assertEquals(1, count);
-
- location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&auths=A&mock=true";
- storage = createAccumuloStorage(location);
- count = 0;
- while (true) {
- Tuple next = storage.getNext();
- if (next == null)
- break;
- assertEquals(6, next.size());
- count++;
- }
- assertEquals(2, count);
- }
-
- protected AccumuloStorage createAccumuloStorage(String location) throws IOException, InterruptedException {
- List<AccumuloStorage> accumuloStorages = createAccumuloStorages(location);
- if (accumuloStorages.size() > 0) {
- return accumuloStorages.get(0);
- }
- return null;
- }
-
- protected List<AccumuloStorage> createAccumuloStorages(String location) throws IOException, InterruptedException {
- List<AccumuloStorage> accumuloStorages = new ArrayList<AccumuloStorage>();
- AccumuloStorage storage = new AccumuloStorage();
- InputFormat inputFormat = storage.getInputFormat();
- Job job = new Job(new Configuration());
- storage.setLocation(location, job);
- List<InputSplit> splits = inputFormat.getSplits(job);
- assertNotNull(splits);
-
- for (InputSplit inputSplit : splits) {
- storage = new AccumuloStorage();
- job = new Job(new Configuration());
- storage.setLocation(location, job);
- TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(),
- new TaskAttemptID("jtid", 0, false, 0, 0));
- RecordReader recordReader = inputFormat.createRecordReader(inputSplit,
- taskAttemptContext);
- recordReader.initialize(inputSplit, taskAttemptContext);
-
- storage.prepareToRead(recordReader, null);
- accumuloStorages.add(storage);
- }
- return accumuloStorages;
- }
-}