You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/10/12 18:43:24 UTC
[03/11] incubator-rya git commit: RYA-401 Fixed all default charset
bugs. Closes #243.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
index a1c84aa..5adb893 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java
@@ -22,19 +22,9 @@ package org.apache.rya.accumulo.mr.tools;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Date;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.AccumuloRdfConstants;
-import org.apache.rya.accumulo.mr.AbstractAccumuloMRTool;
-import org.apache.rya.accumulo.mr.MRUtils;
-import org.apache.rya.api.RdfCloudTripleStoreConstants;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.api.resolver.RyaTripleContext;
-import org.apache.rya.api.resolver.triple.TripleRow;
-import org.apache.rya.api.resolver.triple.TripleRowResolverException;
-
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -49,6 +39,16 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRdfConstants;
+import org.apache.rya.accumulo.mr.AbstractAccumuloMRTool;
+import org.apache.rya.accumulo.mr.MRUtils;
+import org.apache.rya.api.RdfCloudTripleStoreConstants;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RyaTripleContext;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
@@ -64,13 +64,14 @@ import com.google.common.io.ByteStreams;
* Time: 10:39:40 AM
* @deprecated
*/
+@Deprecated
public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool {
- public static void main(String[] args) {
+ public static void main(final String[] args) {
try {
ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), args);
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
}
}
@@ -80,13 +81,13 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool
*/
@Override
- public int run(String[] strings) throws Exception {
+ public int run(final String[] strings) throws Exception {
conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics");
//initialize
init();
- Job job = new Job(conf);
+ final Job job = new Job(conf);
job.setJarByClass(AccumuloRdfCountTool.class);
setupAccumuloInput(job);
@@ -102,16 +103,16 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool
job.setCombinerClass(CountPiecesCombiner.class);
job.setReducerClass(CountPiecesReducer.class);
- String outputTable = MRUtils.getTablePrefix(conf) + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX;
+ final String outputTable = MRUtils.getTablePrefix(conf) + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX;
setupAccumuloOutput(job, outputTable);
// Submit the job
- Date startTime = new Date();
+ final Date startTime = new Date();
System.out.println("Job started: " + startTime);
- int exitCode = job.waitForCompletion(true) ? 0 : 1;
+ final int exitCode = job.waitForCompletion(true) ? 0 : 1;
if (exitCode == 0) {
- Date end_time = new Date();
+ final Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took "
+ (end_time.getTime() - startTime.getTime()) / 1000
@@ -131,38 +132,39 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool
ValueFactoryImpl vf = new ValueFactoryImpl();
- private Text keyOut = new Text();
- private LongWritable valOut = new LongWritable(1);
+ private final Text keyOut = new Text();
+ private final LongWritable valOut = new LongWritable(1);
private RyaTripleContext ryaContext;
@Override
- protected void setup(Context context) throws IOException, InterruptedException {
+ protected void setup(final Context context) throws IOException, InterruptedException {
super.setup(context);
- Configuration conf = context.getConfiguration();
+ final Configuration conf = context.getConfiguration();
tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf));
}
@Override
- protected void map(Key key, Value value, Context context) throws IOException, InterruptedException {
+ protected void map(final Key key, final Value value, final Context context) throws IOException, InterruptedException {
try {
- RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes()));
+ final RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes()));
//count each piece subject, pred, object
- String subj = statement.getSubject().getData();
- String pred = statement.getPredicate().getData();
+ final String subj = statement.getSubject().getData();
+ final String pred = statement.getPredicate().getData();
// byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject());
- RyaURI scontext = statement.getContext();
- boolean includesContext = scontext != null;
- String scontext_str = (includesContext) ? scontext.getData() : null;
+ final RyaURI scontext = statement.getContext();
+ final boolean includesContext = scontext != null;
+ final String scontext_str = (includesContext) ? scontext.getData() : null;
ByteArrayDataOutput output = ByteStreams.newDataOutput();
output.writeUTF(subj);
output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF);
output.writeBoolean(includesContext);
- if (includesContext)
+ if (includesContext) {
output.writeUTF(scontext_str);
+ }
keyOut.set(output.toByteArray());
context.write(keyOut, valOut);
@@ -170,11 +172,12 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool
output.writeUTF(pred);
output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF);
output.writeBoolean(includesContext);
- if (includesContext)
+ if (includesContext) {
output.writeUTF(scontext_str);
+ }
keyOut.set(output.toByteArray());
context.write(keyOut, valOut);
- } catch (TripleRowResolverException e) {
+ } catch (final TripleRowResolverException e) {
throw new IOException(e);
}
}
@@ -182,21 +185,22 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool
public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
- private LongWritable valOut = new LongWritable();
+ private final LongWritable valOut = new LongWritable();
// TODO: can still add up to be large I guess
// any count lower than this does not need to be saved
public static final int TOO_LOW = 2;
@Override
- protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ protected void reduce(final Text key, final Iterable<LongWritable> values, final Context context) throws IOException, InterruptedException {
long count = 0;
- for (LongWritable lw : values) {
+ for (final LongWritable lw : values) {
count += lw.get();
}
- if (count <= TOO_LOW)
+ if (count <= TOO_LOW) {
return;
+ }
valOut.set(count);
context.write(key, valOut);
@@ -218,38 +222,40 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool
private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV;
@Override
- protected void setup(Context context) throws IOException, InterruptedException {
+ protected void setup(final Context context) throws IOException, InterruptedException {
super.setup(context);
tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
final String cv_s = context.getConfiguration().get(MRUtils.AC_CV_PROP);
- if (cv_s != null)
+ if (cv_s != null) {
cv = new ColumnVisibility(cv_s);
+ }
}
@Override
- protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+ protected void reduce(final Text key, final Iterable<LongWritable> values, final Context context) throws IOException, InterruptedException {
long count = 0;
- for (LongWritable lw : values) {
+ for (final LongWritable lw : values) {
count += lw.get();
}
- if (count <= TOO_LOW)
+ if (count <= TOO_LOW) {
return;
+ }
- ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes());
- String v = badi.readUTF();
+ final ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes());
+ final String v = badi.readUTF();
cat_txt.set(badi.readUTF());
Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT;
- boolean includesContext = badi.readBoolean();
+ final boolean includesContext = badi.readBoolean();
if (includesContext) {
columnQualifier = new Text(badi.readUTF());
}
row.set(v);
- Mutation m = new Mutation(row);
- v_out.set((count + "").getBytes());
+ final Mutation m = new Mutation(row);
+ v_out.set((count + "").getBytes(StandardCharsets.UTF_8));
m.put(cat_txt, columnQualifier, cv, v_out);
context.write(table, m);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java
----------------------------------------------------------------------
diff --git a/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java b/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java
index e570ce5..eba4b3d 100644
--- a/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java
+++ b/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java
@@ -1,5 +1,18 @@
package org.apache.rya.camel.cbsail;
+import static org.apache.rya.api.RdfCloudTripleStoreConfiguration.CONF_INFER;
+import static org.apache.rya.api.RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH;
+import static org.apache.rya.camel.cbsail.CbSailComponent.SPARQL_QUERY_PROP;
+import static org.apache.rya.camel.cbsail.CbSailComponent.valueFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -8,9 +21,9 @@ package org.apache.rya.camel.cbsail;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,28 +37,27 @@ package org.apache.rya.camel.cbsail;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.openrdf.model.Statement;
-import org.openrdf.query.*;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResultHandlerBase;
+import org.openrdf.query.TupleQueryResultHandlerException;
import org.openrdf.query.resultio.sparqlxml.SPARQLResultsXMLWriter;
import org.openrdf.repository.RepositoryConnection;
import org.openrdf.repository.RepositoryException;
import org.openrdf.rio.RDFHandlerException;
-import java.io.ByteArrayOutputStream;
-import java.util.*;
-
-import static org.apache.rya.api.RdfCloudTripleStoreConfiguration.*;
-import static org.apache.rya.camel.cbsail.CbSailComponent.SPARQL_QUERY_PROP;
-import static org.apache.rya.camel.cbsail.CbSailComponent.valueFactory;
-
/**
*/
public class CbSailProducer extends DefaultProducer {
private RepositoryConnection connection;
- private CbSailEndpoint.CbSailOutput queryOutput = CbSailEndpoint.CbSailOutput.BINARY;
+ private final CbSailEndpoint.CbSailOutput queryOutput = CbSailEndpoint.CbSailOutput.BINARY;
- public CbSailProducer(CbSailEndpoint endpoint) {
+ public CbSailProducer(final CbSailEndpoint endpoint) {
super(endpoint);
}
@@ -53,78 +65,83 @@ public class CbSailProducer extends DefaultProducer {
public void process(final Exchange exchange) throws Exception {
//If a query is set in the header or uri, use it
Collection<String> queries = new ArrayList<String>();
- Collection tmp = exchange.getIn().getHeader(SPARQL_QUERY_PROP, Collection.class);
+ final Collection tmp = exchange.getIn().getHeader(SPARQL_QUERY_PROP, Collection.class);
if (tmp != null) {
queries = tmp;
} else {
- String query = exchange.getIn().getHeader(SPARQL_QUERY_PROP, String.class);
+ final String query = exchange.getIn().getHeader(SPARQL_QUERY_PROP, String.class);
if (query != null) {
queries.add(query);
}
}
- if (queries.size() > 0)
+ if (queries.size() > 0) {
sparqlQuery(exchange, queries);
- else
+ } else {
inputTriples(exchange);
+ }
}
- protected void inputTriples(Exchange exchange) throws RepositoryException {
- Object body = exchange.getIn().getBody();
+ protected void inputTriples(final Exchange exchange) throws RepositoryException {
+ final Object body = exchange.getIn().getBody();
if (body instanceof Statement) {
//save statement
inputStatement((Statement) body);
} else if (body instanceof List) {
//save list of statements
- List lst = (List) body;
- for (Object obj : lst) {
- if (obj instanceof Statement)
+ final List lst = (List) body;
+ for (final Object obj : lst) {
+ if (obj instanceof Statement) {
inputStatement((Statement) obj);
+ }
}
}
connection.commit();
exchange.getOut().setBody(Boolean.TRUE);
}
- protected void inputStatement(Statement stmt) throws RepositoryException {
+ protected void inputStatement(final Statement stmt) throws RepositoryException {
connection.add(stmt.getSubject(), stmt.getPredicate(), stmt.getObject());
}
- protected void sparqlQuery(Exchange exchange, Collection<String> queries) throws RepositoryException, MalformedQueryException, QueryEvaluationException, TupleQueryResultHandlerException, RDFHandlerException {
+ protected void sparqlQuery(final Exchange exchange, final Collection<String> queries) throws RepositoryException, MalformedQueryException, QueryEvaluationException, TupleQueryResultHandlerException, RDFHandlerException {
- List list = new ArrayList();
- for (String query : queries) {
+ final List list = new ArrayList();
+ for (final String query : queries) {
// Long startTime = exchange.getIn().getHeader(START_TIME_QUERY_PROP, Long.class);
// Long ttl = exchange.getIn().getHeader(TTL_QUERY_PROP, Long.class);
- String auth = exchange.getIn().getHeader(CONF_QUERY_AUTH, String.class);
- Boolean infer = exchange.getIn().getHeader(CONF_INFER, Boolean.class);
+ final String auth = exchange.getIn().getHeader(CONF_QUERY_AUTH, String.class);
+ final Boolean infer = exchange.getIn().getHeader(CONF_INFER, Boolean.class);
- Object output = performSelect(query, auth, infer);
+ final Object output = performSelect(query, auth, infer);
if (queries.size() == 1) {
exchange.getOut().setBody(output);
return;
- } else
+ } else {
list.add(output);
+ }
}
exchange.getOut().setBody(list);
}
- protected Object performSelect(String query, String auth, Boolean infer) throws RepositoryException, MalformedQueryException, QueryEvaluationException, TupleQueryResultHandlerException {
- TupleQuery tupleQuery = connection.prepareTupleQuery(
+ protected Object performSelect(final String query, final String auth, final Boolean infer) throws RepositoryException, MalformedQueryException, QueryEvaluationException, TupleQueryResultHandlerException {
+ final TupleQuery tupleQuery = connection.prepareTupleQuery(
QueryLanguage.SPARQL, query);
- if (auth != null && auth.length() > 0)
+ if (auth != null && auth.length() > 0) {
tupleQuery.setBinding(CONF_QUERY_AUTH, valueFactory.createLiteral(auth));
- if (infer != null)
+ }
+ if (infer != null) {
tupleQuery.setBinding(CONF_INFER, valueFactory.createLiteral(infer));
+ }
if (CbSailEndpoint.CbSailOutput.BINARY.equals(queryOutput)) {
final List listOutput = new ArrayList();
- TupleQueryResultHandlerBase handler = new TupleQueryResultHandlerBase() {
+ final TupleQueryResultHandlerBase handler = new TupleQueryResultHandlerBase() {
@Override
- public void handleSolution(BindingSet bindingSet) throws TupleQueryResultHandlerException {
- Map<String, String> map = new HashMap<String, String>();
- for (String s : bindingSet.getBindingNames()) {
+ public void handleSolution(final BindingSet bindingSet) throws TupleQueryResultHandlerException {
+ final Map<String, String> map = new HashMap<String, String>();
+ for (final String s : bindingSet.getBindingNames()) {
map.put(s, bindingSet.getBinding(s).getValue().stringValue());
}
listOutput.add(map);
@@ -133,10 +150,10 @@ public class CbSailProducer extends DefaultProducer {
tupleQuery.evaluate(handler);
return listOutput;
} else if (CbSailEndpoint.CbSailOutput.XML.equals(queryOutput)) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- SPARQLResultsXMLWriter sparqlWriter = new SPARQLResultsXMLWriter(baos);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final SPARQLResultsXMLWriter sparqlWriter = new SPARQLResultsXMLWriter(baos);
tupleQuery.evaluate(sparqlWriter);
- return new String(baos.toByteArray());
+ return new String(baos.toByteArray(), StandardCharsets.UTF_8);
} else {
throw new IllegalArgumentException("Query Output[" + queryOutput + "] is not recognized");
}
@@ -164,7 +181,7 @@ public class CbSailProducer extends DefaultProducer {
@Override
protected void doStart() throws Exception {
- CbSailEndpoint cbSailEndpoint = (CbSailEndpoint) getEndpoint();
+ final CbSailEndpoint cbSailEndpoint = (CbSailEndpoint) getEndpoint();
connection = cbSailEndpoint.getSailRepository().getConnection();
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java
index ac151d9..c97c717 100644
--- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java
+++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java
@@ -8,9 +8,9 @@ package org.apache.rya.accumulo.pig;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
@@ -34,8 +35,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
@@ -110,8 +111,8 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
return null;
}
- Key key = (Key) reader.getCurrentKey();
- Value value = (Value) reader.getCurrentValue();
+ final Key key = reader.getCurrentKey();
+ final Value value = reader.getCurrentValue();
assert key != null && value != null;
if (logger.isTraceEnabled()) {
@@ -119,7 +120,7 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
}
// and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(6);
+ final Tuple tuple = TupleFactory.getInstance().newTuple(6);
tuple.set(0, new DataByteArray(key.getRow().getBytes()));
tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
@@ -130,7 +131,7 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
logger.trace("Output tuple[" + tuple + "]");
}
return tuple;
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
throw new IOException(e.getMessage());
}
}
@@ -141,12 +142,12 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
}
@Override
- public void prepareToRead(RecordReader reader, PigSplit split) {
+ public void prepareToRead(final RecordReader reader, final PigSplit split) {
this.reader = reader;
}
@Override
- public void setLocation(String location, Job job) throws IOException {
+ public void setLocation(final String location, final Job job) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("Set Location[" + location + "] for job[" + job.getJobName() + "]");
}
@@ -155,8 +156,8 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) {
try {
- AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
- } catch (AccumuloSecurityException e) {
+ AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes(StandardCharsets.UTF_8)));
+ } catch (final AccumuloSecurityException e) {
throw new RuntimeException(e);
}
AccumuloInputFormat.setInputTableName(job, table);
@@ -167,8 +168,9 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
AccumuloInputFormat.setMockInstance(job, inst);
}
}
- if (columnFamilyColumnQualifierPairs.size() > 0)
+ if (columnFamilyColumnQualifierPairs.size() > 0) {
AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
+ }
logger.info("Set ranges[" + ranges + "] for job[" + job.getJobName() + "] on table[" + table + "] " +
"for columns[" + columnFamilyColumnQualifierPairs + "] with authorizations[" + authorizations + "]");
@@ -178,24 +180,25 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
AccumuloInputFormat.setRanges(job, ranges);
}
- protected void setLocationFromUri(String uri, Job job) throws IOException {
+ protected void setLocationFromUri(final String uri, final Job job) throws IOException {
// ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&range=a|z&range=1|9&mock=true
try {
- if (!uri.startsWith("accumulo://"))
+ if (!uri.startsWith("accumulo://")) {
throw new Exception("Bad scheme.");
- String[] urlParts = uri.split("\\?");
+ }
+ final String[] urlParts = uri.split("\\?");
setLocationFromUriParts(urlParts);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[range=startRow|endRow[...],columns=[cf1|cq1,cf2|cq2,...]],mock=true(false)]': " + e.getMessage(), e);
}
}
- protected void setLocationFromUriParts(String[] urlParts) {
+ protected void setLocationFromUriParts(final String[] urlParts) {
String columns = "";
if (urlParts.length > 1) {
- for (String param : urlParts[1].split("&")) {
- String[] pair = param.split("=");
+ for (final String param : urlParts[1].split("&")) {
+ final String[] pair = param.split("=");
if (pair[0].equals("instance")) {
inst = pair[1];
} else if (pair[0].equals("user")) {
@@ -209,7 +212,7 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
} else if (pair[0].equals("columns")) {
columns = pair[1];
} else if (pair[0].equals("range")) {
- String[] r = pair[1].split("\\|");
+ final String[] r = pair[1].split("\\|");
if (r.length == 2) {
addRange(new Range(r[0], r[1]));
} else {
@@ -221,7 +224,7 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
addLocationFromUriPart(pair);
}
}
- String[] parts = urlParts[0].split("/+");
+ final String[] parts = urlParts[0].split("/+");
table = parts[1];
tableName = new Text(table);
@@ -232,11 +235,11 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
}
if (!columns.equals("")) {
- for (String cfCq : columns.split(",")) {
+ for (final String cfCq : columns.split(",")) {
if (cfCq.contains("|")) {
- String[] c = cfCq.split("\\|");
- String cf = c[0];
- String cq = c[1];
+ final String[] c = cfCq.split("\\|");
+ final String cf = c[0];
+ final String cq = c[1];
addColumnPair(cf, cq);
} else {
addColumnPair(cfCq, null);
@@ -245,50 +248,53 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
}
}
- protected void addColumnPair(String cf, String cq) {
+ protected void addColumnPair(final String cf, final String cq) {
columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>((cf != null) ? new Text(cf) : null, (cq != null) ? new Text(cq) : null));
}
- protected void addLocationFromUriPart(String[] pair) {
+ protected void addLocationFromUriPart(final String[] pair) {
}
- protected void addRange(Range range) {
+ protected void addRange(final Range range) {
ranges.add(range);
}
@Override
- public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+ public String relativeToAbsolutePath(final String location, final Path curDir) throws IOException {
return location;
}
@Override
- public void setUDFContextSignature(String signature) {
+ public void setUDFContextSignature(final String signature) {
}
/* StoreFunc methods */
- public void setStoreFuncUDFContextSignature(String signature) {
+ @Override
+ public void setStoreFuncUDFContextSignature(final String signature) {
}
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+ @Override
+ public String relToAbsPathForStoreLocation(final String location, final Path curDir) throws IOException {
return relativeToAbsolutePath(location, curDir);
}
- public void setStoreLocation(String location, Job job) throws IOException {
+ @Override
+ public void setStoreLocation(final String location, final Job job) throws IOException {
conf = job.getConfiguration();
setLocationFromUri(location, job);
if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) {
try {
- AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes()));
- } catch (AccumuloSecurityException e) {
+ AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes(StandardCharsets.UTF_8)));
+ } catch (final AccumuloSecurityException e) {
throw new RuntimeException(e);
}
AccumuloOutputFormat.setDefaultTableName(job, table);
AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
- BatchWriterConfig config = new BatchWriterConfig();
+ final BatchWriterConfig config = new BatchWriterConfig();
config.setMaxLatency(10, TimeUnit.SECONDS);
config.setMaxMemory(10 * 1000 * 1000);
config.setMaxWriteThreads(10);
@@ -296,66 +302,70 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
}
}
+ @Override
public OutputFormat getOutputFormat() {
return new AccumuloOutputFormat();
}
- public void checkSchema(ResourceSchema schema) throws IOException {
+ @Override
+ public void checkSchema(final ResourceSchema schema) throws IOException {
// we don't care about types, they all get casted to ByteBuffers
}
- public void prepareToWrite(RecordWriter writer) {
+ @Override
+ public void prepareToWrite(final RecordWriter writer) {
this.writer = writer;
}
- public void putNext(Tuple t) throws ExecException, IOException {
- Mutation mut = new Mutation(objToText(t.get(0)));
- Text cf = objToText(t.get(1));
- Text cq = objToText(t.get(2));
+ @Override
+ public void putNext(final Tuple t) throws ExecException, IOException {
+ final Mutation mut = new Mutation(objToText(t.get(0)));
+ final Text cf = objToText(t.get(1));
+ final Text cq = objToText(t.get(2));
if (t.size() > 4) {
- Text cv = objToText(t.get(3));
- Value val = new Value(objToBytes(t.get(4)));
+ final Text cv = objToText(t.get(3));
+ final Value val = new Value(objToBytes(t.get(4)));
if (cv.getLength() == 0) {
mut.put(cf, cq, val);
} else {
mut.put(cf, cq, new ColumnVisibility(cv), val);
}
} else {
- Value val = new Value(objToBytes(t.get(3)));
+ final Value val = new Value(objToBytes(t.get(3)));
mut.put(cf, cq, val);
}
try {
writer.write(tableName, mut);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
throw new IOException(e);
}
}
- private static Text objToText(Object o) {
+ private static Text objToText(final Object o) {
return new Text(objToBytes(o));
}
- private static byte[] objToBytes(Object o) {
+ private static byte[] objToBytes(final Object o) {
if (o instanceof String) {
- String str = (String) o;
- return str.getBytes();
+ final String str = (String) o;
+ return str.getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Long) {
- Long l = (Long) o;
- return l.toString().getBytes();
+ final Long l = (Long) o;
+ return l.toString().getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Integer) {
- Integer l = (Integer) o;
- return l.toString().getBytes();
+ final Integer l = (Integer) o;
+ return l.toString().getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Boolean) {
- Boolean l = (Boolean) o;
- return l.toString().getBytes();
+ final Boolean l = (Boolean) o;
+ return l.toString().getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Float) {
- Float l = (Float) o;
- return l.toString().getBytes();
+ final Float l = (Float) o;
+ return l.toString().getBytes(StandardCharsets.UTF_8);
} else if (o instanceof Double) {
- Double l = (Double) o;
- return l.toString().getBytes();
+ final Double l = (Double) o;
+ return l.toString().getBytes(StandardCharsets.UTF_8);
}
// TODO: handle DataBag, Map<Object, Object>, and Tuple
@@ -363,19 +373,20 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord
return ((DataByteArray) o).get();
}
- public void cleanupOnFailure(String failure, Job job) {
+ @Override
+ public void cleanupOnFailure(final String failure, final Job job) {
}
@Override
- public WritableComparable<?> getSplitComparable(InputSplit inputSplit) throws IOException {
+ public WritableComparable<?> getSplitComparable(final InputSplit inputSplit) throws IOException {
//cannot get access to the range directly
- AccumuloInputFormat.RangeInputSplit rangeInputSplit = (AccumuloInputFormat.RangeInputSplit) inputSplit;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos);
+ final AccumuloInputFormat.RangeInputSplit rangeInputSplit = (AccumuloInputFormat.RangeInputSplit) inputSplit;
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos);
rangeInputSplit.write(out);
out.close();
- DataInputStream stream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
- Range range = new Range();
+ final DataInputStream stream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ final Range range = new Range();
range.readFields(stream);
stream.close();
return range;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java
index c1d426c..615c062 100644
--- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java
+++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java
@@ -8,9 +8,9 @@ package org.apache.rya.accumulo.pig;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,8 +23,8 @@ package org.apache.rya.accumulo.pig;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
-import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -48,7 +47,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
@@ -56,8 +54,6 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.algebra.Projection;
-import org.openrdf.query.algebra.ProjectionElem;
-import org.openrdf.query.algebra.ProjectionElemList;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.parser.sparql.SPARQLParser;
@@ -66,15 +62,15 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class IndexWritingTool extends Configured implements Tool {
-
+
private static final String sparql_key = "SPARQL.VALUE";
private static String cardCounter = "count";
-
-
- public static void main(String[] args) throws Exception {
-
+
+
+ public static void main(final String[] args) throws Exception {
+
ToolRunner.run(new Configuration(), new IndexWritingTool(), args);
-
+
}
@Override
@@ -90,12 +86,12 @@ public class IndexWritingTool extends Configured implements Tool {
final String passStr = args[5];
final String tablePrefix = args[6];
- String sparql = FileUtils.readFileToString(new File(sparqlFile));
+ final String sparql = FileUtils.readFileToString(new File(sparqlFile));
- Job job = new Job(getConf(), "Write HDFS Index to Accumulo");
+ final Job job = new Job(getConf(), "Write HDFS Index to Accumulo");
job.setJarByClass(this.getClass());
- Configuration jobConf = job.getConfiguration();
+ final Configuration jobConf = job.getConfiguration();
jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
setVarOrders(sparql, jobConf);
@@ -120,29 +116,29 @@ public class IndexWritingTool extends Configured implements Tool {
setAccumuloOutput(instStr, zooStr, userStr, passStr, job, tableName);
jobConf.set(sparql_key, sparql);
-
- int complete = job.waitForCompletion(true) ? 0 : -1;
+
+ final int complete = job.waitForCompletion(true) ? 0 : -1;
if (complete == 0) {
-
- String[] varOrders = jobConf.getStrings("varOrders");
- String orders = Joiner.on("\u0000").join(varOrders);
+
+ final String[] varOrders = jobConf.getStrings("varOrders");
+ final String orders = Joiner.on("\u0000").join(varOrders);
Instance inst;
-
+
if (zooStr.equals("mock")) {
inst = new MockInstance(instStr);
} else {
inst = new ZooKeeperInstance(instStr, zooStr);
}
-
- Connector conn = inst.getConnector(userStr, passStr.getBytes());
- BatchWriter bw = conn.createBatchWriter(tableName, 10, 5000, 1);
-
- Counters counters = job.getCounters();
- Counter c1 = counters.findCounter(cardCounter, cardCounter);
-
- Mutation m = new Mutation("~SPARQL");
- Value v = new Value(sparql.getBytes());
+
+ final Connector conn = inst.getConnector(userStr, passStr.getBytes(StandardCharsets.UTF_8));
+ final BatchWriter bw = conn.createBatchWriter(tableName, 10, 5000, 1);
+
+ final Counters counters = job.getCounters();
+ final Counter c1 = counters.findCounter(cardCounter, cardCounter);
+
+ final Mutation m = new Mutation("~SPARQL");
+ final Value v = new Value(sparql.getBytes(StandardCharsets.UTF_8));
m.put(new Text("" + c1.getValue()), new Text(orders), v);
bw.addMutation(m);
@@ -155,52 +151,52 @@ public class IndexWritingTool extends Configured implements Tool {
}
-
-
- public void setVarOrders(String s, Configuration conf) throws MalformedQueryException {
- SPARQLParser parser = new SPARQLParser();
- TupleExpr query = parser.parseQuery(s, null).getTupleExpr();
- List<String> projList = Lists.newArrayList(((Projection) query).getProjectionElemList().getTargetNames());
- String projElems = Joiner.on(";").join(projList);
+ public void setVarOrders(final String s, final Configuration conf) throws MalformedQueryException {
+
+ final SPARQLParser parser = new SPARQLParser();
+ final TupleExpr query = parser.parseQuery(s, null).getTupleExpr();
+
+ final List<String> projList = Lists.newArrayList(((Projection) query).getProjectionElemList().getTargetNames());
+ final String projElems = Joiner.on(";").join(projList);
conf.set("projElems", projElems);
- Pattern splitPattern1 = Pattern.compile("\n");
- Pattern splitPattern2 = Pattern.compile(",");
- String[] lines = splitPattern1.split(s);
+ final Pattern splitPattern1 = Pattern.compile("\n");
+ final Pattern splitPattern2 = Pattern.compile(",");
+ final String[] lines = splitPattern1.split(s);
- List<String> varOrders = Lists.newArrayList();
- List<String> varOrderPos = Lists.newArrayList();
+ final List<String> varOrders = Lists.newArrayList();
+ final List<String> varOrderPos = Lists.newArrayList();
int orderNum = 0;
- int projSizeSq = projList.size()*projList.size();
-
+ final int projSizeSq = projList.size()*projList.size();
+
for (String t : lines) {
if(orderNum > projSizeSq){
break;
}
-
+
String[] order = null;
if (t.startsWith("#prefix")) {
t = t.substring(7).trim();
order = splitPattern2.split(t, projList.size());
}
-
+
String tempVarOrder = "";
String tempVarOrderPos = "";
if (order != null) {
- for (String u : order) {
+ for (final String u : order) {
if (tempVarOrder.length() == 0) {
tempVarOrder = u.trim();
} else {
tempVarOrder = tempVarOrder + ";" + u.trim();
}
- int pos = projList.indexOf(u.trim());
+ final int pos = projList.indexOf(u.trim());
if (pos < 0) {
throw new IllegalArgumentException("Invalid variable order!");
} else {
@@ -215,17 +211,17 @@ public class IndexWritingTool extends Configured implements Tool {
varOrders.add(tempVarOrder);
varOrderPos.add(tempVarOrderPos);
}
-
+
if(tempVarOrder.length() > 0) {
orderNum++;
}
}
-
+
if(orderNum == 0) {
varOrders.add(projElems);
String tempVarPos = "";
-
+
for(int i = 0; i < projList.size(); i++) {
if(i == 0) {
tempVarPos = Integer.toString(0);
@@ -234,29 +230,29 @@ public class IndexWritingTool extends Configured implements Tool {
}
}
varOrderPos.add(tempVarPos);
-
+
}
-
- String[] vOrders = varOrders.toArray(new String[varOrders.size()]);
- String[] vOrderPos = varOrderPos.toArray(new String[varOrderPos.size()]);
-
-
-
+
+ final String[] vOrders = varOrders.toArray(new String[varOrders.size()]);
+ final String[] vOrderPos = varOrderPos.toArray(new String[varOrderPos.size()]);
+
+
+
conf.setStrings("varOrders", vOrders);
conf.setStrings("varOrderPos", vOrderPos);
}
-
- private static void setAccumuloOutput(String instStr, String zooStr, String userStr, String passStr, Job job, String tableName)
+
+ private static void setAccumuloOutput(final String instStr, final String zooStr, final String userStr, final String passStr, final Job job, final String tableName)
throws AccumuloSecurityException {
- AuthenticationToken token = new PasswordToken(passStr);
+ final AuthenticationToken token = new PasswordToken(passStr);
AccumuloOutputFormat.setConnectorInfo(job, userStr, token);
AccumuloOutputFormat.setDefaultTableName(job, tableName);
AccumuloOutputFormat.setCreateTables(job, true);
//TODO best way to do this?
-
+
if (zooStr.equals("mock")) {
AccumuloOutputFormat.setMockInstance(job, instStr);
} else {
@@ -270,41 +266,41 @@ public class IndexWritingTool extends Configured implements Tool {
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, Mutation> {
-
+
private static final Logger logger = Logger.getLogger(MyMapper.class);
final static Text EMPTY_TEXT = new Text();
final static Value EMPTY_VALUE = new Value(new byte[] {});
private String[] varOrderPos = null;
private String[] projElem = null;
private Pattern splitPattern = null;
- private List<List<Integer>> varPositions = Lists.newArrayList();
-
-
+ private final List<List<Integer>> varPositions = Lists.newArrayList();
+
+
@Override
- protected void setup(Mapper<LongWritable, Text, Text, Mutation>.Context context) throws IOException,
+ protected void setup(final Mapper<LongWritable, Text, Text, Mutation>.Context context) throws IOException,
InterruptedException {
-
- Configuration conf = context.getConfiguration();
-
+
+ final Configuration conf = context.getConfiguration();
+
varOrderPos = conf.getStrings("varOrderPos");
splitPattern = Pattern.compile("\t");
-
- for (String s : varOrderPos) {
- String[] pos = s.split(";");
- List<Integer> intPos = Lists.newArrayList();
+
+ for (final String s : varOrderPos) {
+ final String[] pos = s.split(";");
+ final List<Integer> intPos = Lists.newArrayList();
int i = 0;
- for(String t: pos) {
+ for(final String t: pos) {
i = Integer.parseInt(t);
intPos.add(i);
}
-
+
varPositions.add(intPos);
-
+
}
-
+
projElem = conf.get("projElems").split(";");
-
+
super.setup(context);
}
@@ -314,17 +310,17 @@ public class IndexWritingTool extends Configured implements Tool {
@Override
- public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException {
+ public void map(final LongWritable key, final Text value, final Context output) throws IOException, InterruptedException {
+
+ final String[] result = splitPattern.split(value.toString());
- String[] result = splitPattern.split(value.toString());
-
- for (List<Integer> list : varPositions) {
+ for (final List<Integer> list : varPositions) {
String values = "";
String vars = "";
- for (Integer i : list) {
+ for (final Integer i : list) {
if (values.length() == 0) {
values = result[i];
@@ -335,7 +331,7 @@ public class IndexWritingTool extends Configured implements Tool {
}
}
- Mutation m = new Mutation(new Text(values));
+ final Mutation m = new Mutation(new Text(values));
m.put(new Text(vars), EMPTY_TEXT, EMPTY_VALUE);
output.write(EMPTY_TEXT, m);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java
index 782840c..5c2c52c 100644
--- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java
+++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java
@@ -1,5 +1,31 @@
package org.apache.rya.accumulo.pig;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRdfEvalStatsDAO;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.rdftriplestore.evaluation.QueryJoinOptimizer;
+import org.apache.rya.rdftriplestore.evaluation.RdfCloudTripleStoreEvaluationStatistics;
+import org.apache.rya.rdftriplestore.inference.InferenceEngine;
+import org.apache.rya.rdftriplestore.inference.InverseOfVisitor;
+import org.apache.rya.rdftriplestore.inference.SymmetricPropertyVisitor;
+import org.apache.rya.rdftriplestore.inference.TransitivePropertyVisitor;
+import org.openrdf.query.algebra.QueryRoot;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.QueryParser;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -8,9 +34,9 @@ package org.apache.rya.accumulo.pig;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,31 +48,6 @@ package org.apache.rya.accumulo.pig;
import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.AccumuloRdfEvalStatsDAO;
-import org.apache.rya.accumulo.AccumuloRyaDAO;
-import org.apache.rya.accumulo.pig.optimizer.SimilarVarJoinOptimizer;
-import org.apache.rya.rdftriplestore.evaluation.QueryJoinOptimizer;
-import org.apache.rya.rdftriplestore.evaluation.RdfCloudTripleStoreEvaluationStatistics;
-import org.apache.rya.rdftriplestore.inference.InferenceEngine;
-import org.apache.rya.rdftriplestore.inference.InverseOfVisitor;
-import org.apache.rya.rdftriplestore.inference.SymmetricPropertyVisitor;
-import org.apache.rya.rdftriplestore.inference.TransitivePropertyVisitor;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.openrdf.query.algebra.QueryRoot;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.QueryParser;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import java.io.ByteArrayInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
/**
* Created by IntelliJ IDEA.
@@ -74,7 +75,7 @@ public class SparqlQueryPigEngine {
return conf;
}
- public void setConf(AccumuloRdfConfiguration conf) {
+ public void setConf(final AccumuloRdfConfiguration conf) {
this.conf = conf;
}
@@ -92,14 +93,14 @@ public class SparqlQueryPigEngine {
}
if (inference || stats) {
- String instance = sparqlToPigTransformVisitor.getInstance();
- String zoo = sparqlToPigTransformVisitor.getZk();
- String user = sparqlToPigTransformVisitor.getUser();
- String pass = sparqlToPigTransformVisitor.getPassword();
+ final String instance = sparqlToPigTransformVisitor.getInstance();
+ final String zoo = sparqlToPigTransformVisitor.getZk();
+ final String user = sparqlToPigTransformVisitor.getUser();
+ final String pass = sparqlToPigTransformVisitor.getPassword();
- Connector connector = new ZooKeeperInstance(instance, zoo).getConnector(user, pass.getBytes());
+ final Connector connector = new ZooKeeperInstance(instance, zoo).getConnector(user, pass.getBytes(StandardCharsets.UTF_8));
- String tablePrefix = sparqlToPigTransformVisitor.getTablePrefix();
+ final String tablePrefix = sparqlToPigTransformVisitor.getTablePrefix();
conf.setTablePrefix(tablePrefix);
if (inference) {
logger.info("Using inference");
@@ -147,28 +148,28 @@ public class SparqlQueryPigEngine {
* @param hdfsSaveLocation to save the execution
* @throws java.io.IOException
*/
- public void runQuery(String sparql, String hdfsSaveLocation) throws IOException {
+ public void runQuery(final String sparql, final String hdfsSaveLocation) throws IOException {
Preconditions.checkNotNull(sparql, "Sparql query cannot be null");
Preconditions.checkNotNull(hdfsSaveLocation, "Hdfs save location cannot be null");
logger.info("Running query[" + sparql + "]\n to Location[" + hdfsSaveLocation + "]");
pigServer.deleteFile(hdfsSaveLocation);
try {
- String pigScript = generatePigScript(sparql);
+ final String pigScript = generatePigScript(sparql);
if (logger.isDebugEnabled()) {
logger.debug("Pig script [" + pigScript + "]");
}
- pigServer.registerScript(new ByteArrayInputStream(pigScript.getBytes()));
+ pigServer.registerScript(new ByteArrayInputStream(pigScript.getBytes(StandardCharsets.UTF_8)));
pigServer.store("PROJ", hdfsSaveLocation); //TODO: Make this a constant
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new IOException(e);
}
}
- public String generatePigScript(String sparql) throws Exception {
+ public String generatePigScript(final String sparql) throws Exception {
Preconditions.checkNotNull(sparql, "Sparql query cannot be null");
- QueryParser parser = new SPARQLParser();
- ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
- QueryRoot tupleExpr = new QueryRoot(parsedQuery.getTupleExpr());
+ final QueryParser parser = new SPARQLParser();
+ final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
+ final QueryRoot tupleExpr = new QueryRoot(parsedQuery.getTupleExpr());
// SimilarVarJoinOptimizer similarVarJoinOptimizer = new SimilarVarJoinOptimizer();
// similarVarJoinOptimizer.optimize(tupleExpr, null, null);
@@ -189,31 +190,31 @@ public class SparqlQueryPigEngine {
}
- public static void main(String[] args) {
+ public static void main(final String[] args) {
try {
Preconditions.checkArgument(args.length == 7, "Usage: java -cp <jar>:$PIG_LIB <class> sparqlFile hdfsSaveLocation cbinstance cbzk cbuser cbpassword rdfTablePrefix.\n " +
"Sample command: java -cp java -cp cloudbase.pig-2.0.0-SNAPSHOT-shaded.jar:/usr/local/hadoop-etc/hadoop-0.20.2/hadoop-0.20.2-core.jar:/srv_old/hdfs-tmp/pig/pig-0.9.2/pig-0.9.2.jar:$HADOOP_HOME/conf org.apache.rya.accumulo.pig.SparqlQueryPigEngine " +
"tstSpqrl.query temp/engineTest stratus stratus13:2181 root password l_");
- String sparql = new String(ByteStreams.toByteArray(new FileInputStream(args[0])));
- String hdfsSaveLocation = args[1];
- SparqlToPigTransformVisitor visitor = new SparqlToPigTransformVisitor();
+ final String sparql = FileUtils.readFileToString(new File(args[0]), StandardCharsets.UTF_8);
+ final String hdfsSaveLocation = args[1];
+ final SparqlToPigTransformVisitor visitor = new SparqlToPigTransformVisitor();
visitor.setTablePrefix(args[6]);
visitor.setInstance(args[2]);
visitor.setZk(args[3]);
visitor.setUser(args[4]);
visitor.setPassword(args[5]);
- SparqlQueryPigEngine engine = new SparqlQueryPigEngine();
+ final SparqlQueryPigEngine engine = new SparqlQueryPigEngine();
engine.setSparqlToPigTransformVisitor(visitor);
engine.setInference(false);
engine.setStats(false);
-
+
engine.init();
engine.runQuery(sparql, hdfsSaveLocation);
engine.destroy();
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
}
}
@@ -222,7 +223,7 @@ public class SparqlQueryPigEngine {
return hadoopDir;
}
- public void setHadoopDir(String hadoopDir) {
+ public void setHadoopDir(final String hadoopDir) {
this.hadoopDir = hadoopDir;
}
@@ -230,7 +231,7 @@ public class SparqlQueryPigEngine {
return pigServer;
}
- public void setPigServer(PigServer pigServer) {
+ public void setPigServer(final PigServer pigServer) {
this.pigServer = pigServer;
}
@@ -238,7 +239,7 @@ public class SparqlQueryPigEngine {
return execType;
}
- public void setExecType(ExecType execType) {
+ public void setExecType(final ExecType execType) {
this.execType = execType;
}
@@ -246,7 +247,7 @@ public class SparqlQueryPigEngine {
return inference;
}
- public void setInference(boolean inference) {
+ public void setInference(final boolean inference) {
this.inference = inference;
}
@@ -254,7 +255,7 @@ public class SparqlQueryPigEngine {
return stats;
}
- public void setStats(boolean stats) {
+ public void setStats(final boolean stats) {
this.stats = stats;
}
@@ -262,7 +263,7 @@ public class SparqlQueryPigEngine {
return sparqlToPigTransformVisitor;
}
- public void setSparqlToPigTransformVisitor(SparqlToPigTransformVisitor sparqlToPigTransformVisitor) {
+ public void setSparqlToPigTransformVisitor(final SparqlToPigTransformVisitor sparqlToPigTransformVisitor) {
this.sparqlToPigTransformVisitor = sparqlToPigTransformVisitor;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java
----------------------------------------------------------------------
diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java
index 974888b..93266df 100644
--- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java
+++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java
@@ -8,9 +8,9 @@ package org.apache.rya.accumulo.pig;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,10 +22,21 @@ package org.apache.rya.accumulo.pig;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
@@ -41,17 +52,6 @@ import org.apache.rya.api.resolver.RyaTripleContext;
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.rdftriplestore.inference.InferenceEngine;
import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
-
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.openrdf.model.Resource;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
@@ -94,7 +94,7 @@ public class StatementPatternStorage extends AccumuloStorage {
else {
ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
}
-
+
}
private Value getValue(Var subjectVar) {
@@ -115,8 +115,9 @@ public class StatementPatternStorage extends AccumuloStorage {
addInferredRanges(table, job);
}
- if (layout == null || ranges.size() == 0)
+ if (layout == null || ranges.size() == 0) {
throw new IllegalArgumentException("Range and/or layout is null. Check the query");
+ }
table = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, table);
tableName = new Text(table);
}
@@ -195,8 +196,9 @@ public class StatementPatternStorage extends AccumuloStorage {
RyaURI predicate_rya = RdfToRyaConversions.convertURI((URI) p_v);
RyaType object_rya = RdfToRyaConversions.convertValue(o_v);
TriplePatternStrategy strategy = ryaContext.retrieveStrategy(subject_rya, predicate_rya, object_rya, null);
- if (strategy == null)
+ if (strategy == null) {
return new RdfCloudTripleStoreUtils.CustomEntry<TABLE_LAYOUT, Range>(TABLE_LAYOUT.SPO, new Range());
+ }
Map.Entry<TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(subject_rya, predicate_rya, object_rya, null, null);
ByteRange byteRange = entry.getValue();
return new RdfCloudTripleStoreUtils.CustomEntry<org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT, Range>(
@@ -215,9 +217,9 @@ public class StatementPatternStorage extends AccumuloStorage {
ryaDAO.setConf(rdfConf);
try {
if (!mock) {
- ryaDAO.setConnector(new ZooKeeperInstance(inst, zookeepers).getConnector(user, password.getBytes()));
+ ryaDAO.setConnector(new ZooKeeperInstance(inst, zookeepers).getConnector(user, password.getBytes(StandardCharsets.UTF_8)));
} else {
- ryaDAO.setConnector(new MockInstance(inst).getConnector(user, password.getBytes()));
+ ryaDAO.setConnector(new MockInstance(inst).getConnector(user, password.getBytes(StandardCharsets.UTF_8)));
}
} catch (Exception e) {
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3dc7c68..a6415ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,6 @@ under the License.
<jsr305.version>1.3.9-1</jsr305.version>
<jcip.version>1.0-1</jcip.version>
- <findbugs.plugin.version>3.0.4</findbugs.plugin.version>
<kafka.version>0.10.0.1</kafka.version>
<jopt-simple.version>4.9</jopt-simple.version>
@@ -682,7 +681,6 @@ under the License.
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
@@ -819,7 +817,6 @@ under the License.
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <argLine>-Dfile.encoding=${project.build.sourceEncoding}</argLine>
<systemPropertyVariables>
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
</systemPropertyVariables>
@@ -957,6 +954,15 @@ under the License.
<artifactId>license-maven-plugin</artifactId>
<version>3.0</version>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.5</version>
+ <configuration>
+ <effort>Max</effort>
+ <threshold>Low</threshold>
+ </configuration>
+ </plugin>
</plugins>
</pluginManagement>
@@ -1030,6 +1036,23 @@ under the License.
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <configuration>
+ <failOnError>true</failOnError> <!-- These are serious defects that aren't allowed in Rya. Fail the build. -->
+ <visitors>DefaultEncodingDetector</visitors> <!-- Only specify detectors that should not detect any errors. -->
+ </configuration>
+ <executions>
+ <execution>
+ <id>analyze-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
@@ -1038,7 +1061,6 @@ under the License.
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
- <version>${findbugs.plugin.version}</version>
</plugin>
</plugins>
</reporting>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
----------------------------------------------------------------------
diff --git a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
index bf655ce..921acaa 100644
--- a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
+++ b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
@@ -1,5 +1,3 @@
-package org.apache.rya.rdftriplestore;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,13 +16,13 @@ package org.apache.rya.rdftriplestore;
* specific language governing permissions and limitations
* under the License.
*/
-
-
+package org.apache.rya.rdftriplestore;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.lang.reflect.Constructor;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -156,7 +154,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase {
final Value object, final Resource... contexts) throws SailException {
try {
final String cv_s = conf.getCv();
- final byte[] cv = cv_s == null ? null : cv_s.getBytes();
+ final byte[] cv = cv_s == null ? null : cv_s.getBytes(StandardCharsets.UTF_8);
final List<RyaStatement> ryaStatements = new ArrayList<>();
if (contexts != null && contexts.length > 0) {
for (final Resource context : contexts) {