You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/10/12 18:43:25 UTC
[04/11] incubator-rya git commit: RYA-401 Fixed all default charset
bugs. Closes #243.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
index 50180ad..129bd6d 100644
--- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
+++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
@@ -19,6 +19,7 @@
package org.apache.rya.periodic.notification.serialization;
import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
@@ -43,47 +44,47 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin
private static final Logger log = Logger.getLogger(BindingSetSerDe.class);
private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer();
- private static final byte[] DELIM_BYTE = "\u0002".getBytes();
-
- private byte[] toBytes(BindingSet bindingSet) {
+ private static final byte[] DELIM_BYTE = "\u0002".getBytes(StandardCharsets.UTF_8);
+
+ private byte[] toBytes(final BindingSet bindingSet) {
try {
return getBytes(getVarOrder(bindingSet), bindingSet);
- } catch(Exception e) {
+ } catch(final Exception e) {
log.trace("Unable to serialize BindingSet: " + bindingSet);
return new byte[0];
}
}
- private BindingSet fromBytes(byte[] bsBytes) {
+ private BindingSet fromBytes(final byte[] bsBytes) {
try{
- int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE);
- byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex);
- byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length);
- VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";"));
- return getBindingSet(varOrder, bsBytesNoVarOrder);
- } catch(Exception e) {
+ final int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE);
+ final byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex);
+ final byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length);
+ final VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";"));
+ return getBindingSet(varOrder, bsBytesNoVarOrder);
+ } catch(final Exception e) {
log.trace("Unable to deserialize BindingSet: " + bsBytes);
return new QueryBindingSet();
}
}
-
- private VariableOrder getVarOrder(BindingSet bs) {
+
+ private VariableOrder getVarOrder(final BindingSet bs) {
return new VariableOrder(bs.getBindingNames());
}
-
- private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException {
- byte[] bsBytes = serializer.convert(bs, varOrder);
- String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders());
- byte[] varOrderBytes = varOrderString.getBytes("UTF-8");
+
+ private byte[] getBytes(final VariableOrder varOrder, final BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException {
+ final byte[] bsBytes = serializer.convert(bs, varOrder);
+ final String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders());
+ final byte[] varOrderBytes = varOrderString.getBytes("UTF-8");
return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes);
}
-
- private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException {
+
+ private BindingSet getBindingSet(final VariableOrder varOrder, final byte[] bsBytes) throws BindingSetConversionException {
return serializer.convert(bsBytes, varOrder);
}
@Override
- public BindingSet deserialize(String topic, byte[] bytes) {
+ public BindingSet deserialize(final String topic, final byte[] bytes) {
return fromBytes(bytes);
}
@@ -93,12 +94,12 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin
}
@Override
- public void configure(Map<String, ?> arg0, boolean arg1) {
+ public void configure(final Map<String, ?> arg0, final boolean arg1) {
// Do nothing. Nothing to configure.
}
@Override
- public byte[] serialize(String topic, BindingSet bs) {
+ public byte[] serialize(final String topic, final BindingSet bs) {
return toBytes(bs);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java
index 115074c..1073b6e 100644
--- a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java
+++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java
@@ -19,6 +19,7 @@
package org.apache.rya.export.accumulo.util;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -336,7 +337,7 @@ public final class AccumuloRyaUtils {
public static Authorizations addUserAuths(final String user, final SecurityOperations secOps, final Authorizations auths) throws AccumuloException, AccumuloSecurityException {
final List<String> authList = new ArrayList<>();
for (final byte[] authBytes : auths.getAuthorizations()) {
- final String auth = new String(authBytes);
+ final String auth = new String(authBytes, StandardCharsets.UTF_8);
authList.add(auth);
}
return addUserAuths(user, secOps, authList.toArray(new String[0]));
@@ -358,7 +359,7 @@ public final class AccumuloRyaUtils {
authList.add(currentAuth);
}
for (final String newAuth : auths) {
- authList.add(newAuth.getBytes());
+ authList.add(newAuth.getBytes(StandardCharsets.UTF_8));
}
final Authorizations result = new Authorizations(authList);
return result;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java
----------------------------------------------------------------------
diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java
index f3d523f..fd1bc4d 100644
--- a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java
+++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java
@@ -18,6 +18,8 @@
*/
package org.apache.rya.export.client.merge;
+import java.nio.charset.StandardCharsets;
+
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.export.api.MergerException;
import org.apache.rya.export.api.StatementMerger;
@@ -41,8 +43,8 @@ public class VisibilityStatementMerger implements StatementMerger {
final RyaStatement parentStatement = parent.get();
if(child.isPresent()) {
final RyaStatement childStatement = child.get();
- final String pVis = new String(parentStatement.getColumnVisibility());
- final String cVis = new String(childStatement.getColumnVisibility());
+ final String pVis = new String(parentStatement.getColumnVisibility(), StandardCharsets.UTF_8);
+ final String cVis = new String(childStatement.getColumnVisibility(), StandardCharsets.UTF_8);
String visibility = "";
final Joiner join = Joiner.on(")&(");
if(pVis.isEmpty() || cVis.isEmpty()) {
@@ -50,7 +52,7 @@ public class VisibilityStatementMerger implements StatementMerger {
} else {
visibility = "(" + join.join(pVis, cVis) + ")";
}
- parentStatement.setColumnVisibility(visibility.getBytes());
+ parentStatement.setColumnVisibility(visibility.getBytes(StandardCharsets.UTF_8));
return Optional.of(parentStatement);
}
return parent;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java
----------------------------------------------------------------------
diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java
index 4597400..59b92ba 100644
--- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java
+++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java
@@ -22,6 +22,7 @@ import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.text.ParseException;
import java.util.ArrayList;
@@ -81,9 +82,6 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
-
-import com.google.common.base.Joiner;
-
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.mr.AccumuloHDFSFileInputFormat;
import org.apache.rya.accumulo.mr.MRUtils;
@@ -105,6 +103,8 @@ import org.apache.rya.api.RdfCloudTripleStoreUtils;
import org.apache.rya.api.layout.TablePrefixLayoutStrategy;
import org.apache.rya.indexing.accumulo.ConfigUtils;
+import com.google.common.base.Joiner;
+
/**
* Handles copying data from a parent instance into a child instance.
*/
@@ -589,9 +589,9 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool {
final Path splitsPath = getPath(baseOutputDir, childTableName, "splits.txt");
final Collection<Text> splits = parentTableOperations.listSplits(parentTableName, 100);
log.info("Creating splits file at: " + splitsPath);
- try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(splitsPath)))) {
+ try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(splitsPath)), false, StandardCharsets.UTF_8.name())) {
for (final Text split : splits) {
- final String encoded = new String(Base64.encodeBase64(TextUtil.getBytes(split)));
+ final String encoded = new String(Base64.encodeBase64(TextUtil.getBytes(split)), StandardCharsets.UTF_8);
out.println(encoded);
}
}
@@ -873,12 +873,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool {
}
log.info("Starting Copy Tool");
- Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread thread, final Throwable throwable) {
- log.error("Uncaught exception in " + thread.getName(), throwable);
- }
- });
+ Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> log.error("Uncaught exception in " + thread.getName(), throwable));
final CopyTool copyTool = new CopyTool();
final int returnCode = copyTool.setupAndRun(args);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java
index e702e64..5a3b928 100644
--- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java
+++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java
@@ -19,6 +19,7 @@
package org.apache.rya.accumulo.mr.merge.util;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -49,13 +50,6 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
-import org.openrdf.model.Literal;
-import org.openrdf.model.ValueFactory;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableSet;
-
-import info.aduna.iteration.CloseableIteration;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.accumulo.mr.MRUtils;
@@ -66,6 +60,13 @@ import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.openrdf.model.Literal;
+import org.openrdf.model.ValueFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+
+import info.aduna.iteration.CloseableIteration;
/**
* Utility methods for an Accumulo Rya instance.
@@ -515,7 +516,7 @@ public final class AccumuloRyaUtils {
public static Authorizations addUserAuths(final String user, final SecurityOperations secOps, final Authorizations auths) throws AccumuloException, AccumuloSecurityException {
final List<String> authList = new ArrayList<>();
for (final byte[] authBytes : auths.getAuthorizations()) {
- final String auth = new String(authBytes);
+ final String auth = new String(authBytes, StandardCharsets.UTF_8);
authList.add(auth);
}
return addUserAuths(user, secOps, authList.toArray(new String[0]));
@@ -537,7 +538,7 @@ public final class AccumuloRyaUtils {
authList.add(currentAuth);
}
for (final String newAuth : auths) {
- authList.add(newAuth.getBytes());
+ authList.add(newAuth.getBytes(StandardCharsets.UTF_8));
}
final Authorizations result = new Authorizations(authList);
return result;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java
----------------------------------------------------------------------
diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java
index 9627c54..42109db 100644
--- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java
+++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java
@@ -18,15 +18,32 @@
*/
package org.apache.rya.accumulo.mr.merge.util;
-import java.io.BufferedReader;
-import java.io.FileReader;
+import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.mr.merge.CopyTool;
+import org.apache.rya.accumulo.mr.merge.util.QueryRuleset.QueryRulesetException;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
+import org.apache.rya.rdftriplestore.inference.InferJoin;
+import org.apache.rya.rdftriplestore.inference.InferUnion;
+import org.apache.rya.rdftriplestore.inference.InferenceEngine;
+import org.apache.rya.rdftriplestore.inference.InverseOfVisitor;
+import org.apache.rya.rdftriplestore.inference.SameAsVisitor;
+import org.apache.rya.rdftriplestore.inference.SubClassOfVisitor;
+import org.apache.rya.rdftriplestore.inference.SubPropertyOfVisitor;
+import org.apache.rya.rdftriplestore.inference.SymmetricPropertyVisitor;
+import org.apache.rya.rdftriplestore.inference.TransitivePropertyVisitor;
+import org.apache.rya.rdftriplestore.utils.FixedStatementPattern;
+import org.apache.rya.rdftriplestore.utils.TransitivePropertySP;
+import org.apache.rya.sail.config.RyaSailFactory;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
@@ -52,22 +69,6 @@ import org.openrdf.query.parser.ParsedTupleQuery;
import org.openrdf.query.parser.QueryParserUtil;
import org.openrdf.sail.SailException;
-import org.apache.rya.accumulo.mr.merge.CopyTool;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
-import org.apache.rya.rdftriplestore.inference.InferJoin;
-import org.apache.rya.rdftriplestore.inference.InferUnion;
-import org.apache.rya.rdftriplestore.inference.InferenceEngine;
-import org.apache.rya.rdftriplestore.inference.InverseOfVisitor;
-import org.apache.rya.rdftriplestore.inference.SameAsVisitor;
-import org.apache.rya.rdftriplestore.inference.SubClassOfVisitor;
-import org.apache.rya.rdftriplestore.inference.SubPropertyOfVisitor;
-import org.apache.rya.rdftriplestore.inference.SymmetricPropertyVisitor;
-import org.apache.rya.rdftriplestore.inference.TransitivePropertyVisitor;
-import org.apache.rya.rdftriplestore.utils.FixedStatementPattern;
-import org.apache.rya.rdftriplestore.utils.TransitivePropertySP;
-import org.apache.rya.sail.config.RyaSailFactory;
-
/**
* Represents a set of {@link CopyRule} instances derived from a query. The ruleset determines a logical
* subset of statements in Rya, such that statements selected by the ruleset are at least enough to answer
@@ -432,16 +433,7 @@ public class QueryRuleset {
final String queryFile = conf.get(CopyTool.QUERY_FILE_PROP);
if (query == null && queryFile != null) {
try {
- final FileReader fileReader = new FileReader(queryFile);
- final BufferedReader reader = new BufferedReader(fileReader);
- final StringBuilder builder = new StringBuilder();
- String line = reader.readLine();
- while (line != null) {
- builder.append(line).append("\n");
- line = reader.readLine();
- }
- query = builder.toString();
- reader.close();
+ query = FileUtils.readFileToString(new File(queryFile), StandardCharsets.UTF_8);
conf.set(CopyTool.QUERY_STRING_PROP, query);
}
catch (final IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index 4070849..3fea6ed 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -18,6 +18,7 @@
*/
package org.apache.rya.indexing.pcj.fluo.demo;
+import java.nio.charset.StandardCharsets;
import java.util.Set;
import org.apache.accumulo.core.client.Connector;
@@ -96,7 +97,7 @@ public class FluoAndHistoricPcjsDemo implements Demo {
/**
* Used to pause the demo waiting for the presenter to hit the Enter key.
*/
- private final java.util.Scanner keyboard = new java.util.Scanner(System.in);
+ private final java.util.Scanner keyboard = new java.util.Scanner(System.in, StandardCharsets.UTF_8.name());
@Override
public void execute(
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java
----------------------------------------------------------------------
diff --git a/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java b/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java
index 105e852..f630df0 100644
--- a/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java
+++ b/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java
@@ -1,5 +1,3 @@
-package org.apache.rya.joinselect.mr;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -8,9 +6,9 @@ package org.apache.rya.joinselect.mr;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,21 +16,16 @@ package org.apache.rya.joinselect.mr;
* specific language governing permissions and limitations
* under the License.
*/
-
-
+package org.apache.rya.joinselect.mr;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_OUTPUTPATH;
import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_TABLE;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;
-import org.apache.rya.joinselect.mr.utils.CardinalityType;
-import org.apache.rya.joinselect.mr.utils.CompositeType;
-import org.apache.rya.joinselect.mr.utils.JoinSelectStatsUtil;
-import org.apache.rya.joinselect.mr.utils.TripleCard;
-
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -45,6 +38,10 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Tool;
+import org.apache.rya.joinselect.mr.utils.CardinalityType;
+import org.apache.rya.joinselect.mr.utils.CompositeType;
+import org.apache.rya.joinselect.mr.utils.JoinSelectStatsUtil;
+import org.apache.rya.joinselect.mr.utils.TripleCard;
public class JoinSelectProspectOutput extends Configured implements Tool {
@@ -55,21 +52,22 @@ public class JoinSelectProspectOutput extends Configured implements Tool {
Text inText = new Text();
Pattern splitPattern = Pattern.compile(DELIM);
- public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
+ @Override
+ public void map(final Key key, final Value data, final Context context) throws IOException, InterruptedException {
key.getRow(inText);
- String[] cardData = splitPattern.split(inText.toString().trim(), 4);
+ final String[] cardData = splitPattern.split(inText.toString().trim(), 4);
// System.out.println("Card data is " + cardData[0] + ", "+ cardData[1] + ", "+ cardData[2]);
if (cardData.length == 3 && ((cardData[0].equals("subject")) || (cardData[0].equals("object")) || (cardData[0].equals("predicate")))) {
- Text tripleValType = new Text(cardData[0]);
- Text cardKey = new Text(cardData[1]);
- LongWritable ts = new LongWritable(Long.valueOf(cardData[2]));
+ final Text tripleValType = new Text(cardData[0]);
+ final Text cardKey = new Text(cardData[1]);
+ final LongWritable ts = new LongWritable(Long.valueOf(cardData[2]));
- String s = new String(data.get());
- LongWritable card = new LongWritable(Long.parseLong(s));
+ final String s = new String(data.get(), StandardCharsets.UTF_8);
+ final LongWritable card = new LongWritable(Long.parseLong(s));
- CompositeType cType = new CompositeType(cardKey, new IntWritable(1));
- TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts));
+ final CompositeType cType = new CompositeType(cardKey, new IntWritable(1));
+ final TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts));
context.write(new CompositeType(cardKey, new IntWritable(1)), new TripleCard(new CardinalityType(card, tripleValType, ts)));
// System.out.println("Card mapper output key is " + cType + " and value is " + tCard );
@@ -77,15 +75,15 @@ public class JoinSelectProspectOutput extends Configured implements Tool {
} else if (cardData.length == 4
&& ((cardData[0].equals("subjectpredicate")) || (cardData[0].equals("subjectobject")) || (cardData[0].equals("predicateobject")))) {
- Text tripleValType = new Text(cardData[0]);
- Text cardKey = new Text(cardData[1] + DELIM + cardData[2]);
- LongWritable ts = new LongWritable(Long.valueOf(cardData[3]));
+ final Text tripleValType = new Text(cardData[0]);
+ final Text cardKey = new Text(cardData[1] + DELIM + cardData[2]);
+ final LongWritable ts = new LongWritable(Long.valueOf(cardData[3]));
- String s = new String(data.get());
- LongWritable card = new LongWritable(Long.parseLong(s));
+ final String s = new String(data.get(), StandardCharsets.UTF_8);
+ final LongWritable card = new LongWritable(Long.parseLong(s));
- CompositeType cType = new CompositeType(cardKey, new IntWritable(1));
- TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts));
+ final CompositeType cType = new CompositeType(cardKey, new IntWritable(1));
+ final TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts));
context.write(new CompositeType(cardKey, new IntWritable(1)), new TripleCard(new CardinalityType(card, tripleValType, ts)));
// System.out.println("Card mapper output key is " + cType + " and value is " + tCard );
@@ -97,16 +95,16 @@ public class JoinSelectProspectOutput extends Configured implements Tool {
}
@Override
- public int run(String[] args) throws AccumuloSecurityException, IOException, ClassNotFoundException, InterruptedException {
+ public int run(final String[] args) throws AccumuloSecurityException, IOException, ClassNotFoundException, InterruptedException {
- Configuration conf = getConf();
- String inTable = conf.get(PROSPECTS_TABLE);
- String auths = conf.get(AUTHS);
- String outPath = conf.get(PROSPECTS_OUTPUTPATH);
+ final Configuration conf = getConf();
+ final String inTable = conf.get(PROSPECTS_TABLE);
+ final String auths = conf.get(AUTHS);
+ final String outPath = conf.get(PROSPECTS_OUTPUTPATH);
assert inTable != null && outPath != null;
- Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ final Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java
----------------------------------------------------------------------
diff --git a/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java b/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java
index ebcf6c3..f408b7d 100644
--- a/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java
+++ b/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java
@@ -21,6 +21,7 @@ package org.apache.rya.prospector.plans.impl;
import static org.apache.rya.prospector.utils.ProspectorConstants.COUNT;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -62,7 +63,7 @@ import org.openrdf.model.vocabulary.XMLSchema;
public class CountPlan implements IndexWorkPlan {
@Override
- public Collection<Map.Entry<IntermediateProspect, LongWritable>> map(RyaStatement ryaStatement) {
+ public Collection<Map.Entry<IntermediateProspect, LongWritable>> map(final RyaStatement ryaStatement) {
final RyaURI subject = ryaStatement.getSubject();
final RyaURI predicate = ryaStatement.getPredicate();
final String subjpred = ryaStatement.getSubject().getData() + DELIM + ryaStatement.getPredicate().getData();
@@ -71,7 +72,7 @@ public class CountPlan implements IndexWorkPlan {
final RyaType object = ryaStatement.getObject();
final int localIndex = URIUtil.getLocalNameIndex(subject.getData());
final String namespace = subject.getData().substring(0, localIndex - 1);
- final String visibility = new String(ryaStatement.getColumnVisibility());
+ final String visibility = new String(ryaStatement.getColumnVisibility(), StandardCharsets.UTF_8);
final List<Map.Entry<IntermediateProspect, LongWritable>> entries = new ArrayList<>(7);
@@ -149,7 +150,7 @@ public class CountPlan implements IndexWorkPlan {
}
@Override
- public Collection<Map.Entry<IntermediateProspect, LongWritable>> combine(IntermediateProspect prospect, Iterable<LongWritable> counts) {
+ public Collection<Map.Entry<IntermediateProspect, LongWritable>> combine(final IntermediateProspect prospect, final Iterable<LongWritable> counts) {
long sum = 0;
for(final LongWritable count : counts) {
sum += count.get();
@@ -158,7 +159,7 @@ public class CountPlan implements IndexWorkPlan {
}
@Override
- public void reduce(IntermediateProspect prospect, Iterable<LongWritable> counts, Date timestamp, Reducer.Context context) throws IOException, InterruptedException {
+ public void reduce(final IntermediateProspect prospect, final Iterable<LongWritable> counts, final Date timestamp, final Reducer.Context context) throws IOException, InterruptedException {
long sum = 0;
for(final LongWritable count : counts) {
sum += count.get();
@@ -172,7 +173,7 @@ public class CountPlan implements IndexWorkPlan {
final String dataType = prospect.getDataType();
final ColumnVisibility visibility = new ColumnVisibility(prospect.getVisibility());
- final Value sumValue = new Value(("" + sum).getBytes());
+ final Value sumValue = new Value(("" + sum).getBytes(StandardCharsets.UTF_8));
m.put(COUNT, prospect.getDataType(), visibility, timestamp.getTime(), sumValue);
context.write(null, m);
@@ -185,7 +186,7 @@ public class CountPlan implements IndexWorkPlan {
}
@Override
- public String getCompositeValue(List<String> indices){
+ public String getCompositeValue(final List<String> indices){
final Iterator<String> indexIt = indices.iterator();
String compositeIndex = indexIt.next();
while (indexIt.hasNext()){
@@ -196,7 +197,7 @@ public class CountPlan implements IndexWorkPlan {
}
@Override
- public List<IndexEntry> query(Connector connector, String tableName, List<Long> prospectTimes, String type, String compositeIndex, String dataType, String[] auths) throws TableNotFoundException {
+ public List<IndexEntry> query(final Connector connector, final String tableName, final List<Long> prospectTimes, final String type, final String compositeIndex, final String dataType, final String[] auths) throws TableNotFoundException {
assert connector != null && tableName != null && type != null && compositeIndex != null;
final BatchScanner bs = connector.createBatchScanner(tableName, new Authorizations(auths), 4);
@@ -242,7 +243,7 @@ public class CountPlan implements IndexWorkPlan {
// Create an entry using the values that were found.
final String entryDataType = k.getColumnQualifier().toString();
final String entryVisibility = k.getColumnVisibility().toString();
- final Long entryCount = Long.parseLong(new String(v.get()));
+ final Long entryCount = Long.parseLong(new String(v.get(), StandardCharsets.UTF_8));
indexEntries.add(
IndexEntry.builder()
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java
----------------------------------------------------------------------
diff --git a/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java b/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java
index 4dc9253..d75730b 100644
--- a/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java
+++ b/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java
@@ -24,6 +24,7 @@ import static org.apache.rya.prospector.utils.ProspectorConstants.PASSWORD;
import static org.apache.rya.prospector.utils.ProspectorConstants.USERNAME;
import static org.apache.rya.prospector.utils.ProspectorConstants.ZOOKEEPERS;
+import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
@@ -56,7 +57,7 @@ public class ProspectorUtils {
public static final long INDEXED_DATE_SORT_VAL = 999999999999999999L; // 18 char long, same length as date format pattern below
public static final String INDEXED_DATE_FORMAT = "yyyyMMddHHmmsssSSS";
- public static String getReverseIndexDateTime(Date date) {
+ public static String getReverseIndexDateTime(final Date date) {
Validate.notNull(date);
final String formattedDateString = new SimpleDateFormat(INDEXED_DATE_FORMAT).format(date);
final long diff = INDEXED_DATE_SORT_VAL - Long.valueOf(formattedDateString);
@@ -64,7 +65,7 @@ public class ProspectorUtils {
return Long.toString(diff);
}
- public static Map<String, IndexWorkPlan> planMap(Collection<IndexWorkPlan> plans) {
+ public static Map<String, IndexWorkPlan> planMap(final Collection<IndexWorkPlan> plans) {
final Map<String, IndexWorkPlan> planMap = new HashMap<>();
for(final IndexWorkPlan plan : plans) {
planMap.put(plan.getIndexType(), plan);
@@ -72,7 +73,7 @@ public class ProspectorUtils {
return planMap;
}
- public static void initMRJob(Job job, String table, String outtable, String[] auths) throws AccumuloSecurityException {
+ public static void initMRJob(final Job job, final String table, final String outtable, final String[] auths) throws AccumuloSecurityException {
final Configuration conf = job.getConfiguration();
final String username = conf.get(USERNAME);
final String password = conf.get(PASSWORD);
@@ -91,7 +92,7 @@ public class ProspectorUtils {
throw new IllegalArgumentException("Must specify either mock or zookeepers");
}
- AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes()));
+ AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes(StandardCharsets.UTF_8)));
AccumuloInputFormat.setInputTableName(job, table);
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setScanAuthorizations(job, new Authorizations(auths));
@@ -100,11 +101,11 @@ public class ProspectorUtils {
job.setOutputFormatClass(AccumuloOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
- AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes()));
+ AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes(StandardCharsets.UTF_8)));
AccumuloOutputFormat.setDefaultTableName(job, outtable);
}
- public static void addMRPerformance(Configuration conf) {
+ public static void addMRPerformance(final Configuration conf) {
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.set("io.sort.mb", "256");
@@ -112,7 +113,7 @@ public class ProspectorUtils {
conf.set("mapred.map.output.compression.codec", GzipCodec.class.getName());
}
- public static Instance instance(Configuration conf) {
+ public static Instance instance(final Configuration conf) {
assert conf != null;
final String instance_str = conf.get(INSTANCE);
@@ -127,7 +128,7 @@ public class ProspectorUtils {
}
}
- public static Connector connector(Instance instance, Configuration conf) throws AccumuloException, AccumuloSecurityException {
+ public static Connector connector(Instance instance, final Configuration conf) throws AccumuloException, AccumuloSecurityException {
final String username = conf.get(USERNAME);
final String password = conf.get(PASSWORD);
if (instance == null) {
@@ -136,7 +137,7 @@ public class ProspectorUtils {
return instance.getConnector(username, new PasswordToken(password));
}
- public static void writeMutations(Connector connector, String tableName, Collection<Mutation> mutations) throws TableNotFoundException, MutationsRejectedException {
+ public static void writeMutations(final Connector connector, final String tableName, final Collection<Mutation> mutations) throws TableNotFoundException, MutationsRejectedException {
final BatchWriter bw = connector.createBatchWriter(tableName, 10000l, 10000l, 4);
for(final Mutation mutation : mutations) {
bw.addMutation(mutation);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java b/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java
index bb78373..5515a5b 100644
--- a/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java
+++ b/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java
@@ -25,6 +25,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -33,10 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.rya.accumulo.mr.MRUtils;
-import org.apache.rya.reasoning.Fact;
-import org.apache.rya.reasoning.Schema;
-
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -45,13 +42,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.rya.accumulo.mr.MRUtils;
+import org.apache.rya.reasoning.Fact;
+import org.apache.rya.reasoning.Schema;
import org.openrdf.OpenRDFException;
import org.openrdf.model.Resource;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
-import org.openrdf.model.vocabulary.RDF;
import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDF;
import org.openrdf.query.BindingSet;
import org.openrdf.query.QueryLanguage;
import org.openrdf.query.TupleQuery;
@@ -98,7 +98,7 @@ public class ConformanceTest extends Configured implements Tool {
Set<Statement> inferred = new HashSet<>();
Set<Statement> error = new HashSet<>();
@Override
- public void handleStatement(Statement st) {
+ public void handleStatement(final Statement st) {
if (types.contains(TEST_ENTAILMENT)) {
expected.add(st);
}
@@ -107,7 +107,7 @@ public class ConformanceTest extends Configured implements Tool {
}
}
String type() {
- StringBuilder sb = new StringBuilder();
+ final StringBuilder sb = new StringBuilder();
if (types.contains(TEST_CONSISTENCY)) {
sb.append("{Consistency}");
}
@@ -127,17 +127,17 @@ public class ConformanceTest extends Configured implements Tool {
private static class OutputCollector extends RDFHandlerBase {
Set<Statement> triples = new HashSet<>();
@Override
- public void handleStatement(Statement st) {
+ public void handleStatement(final Statement st) {
triples.add(st);
}
}
- public static void main(String[] args) throws Exception {
+ public static void main(final String[] args) throws Exception {
ToolRunner.run(new ConformanceTest(), args);
}
@Override
- public int run(String[] args) throws Exception {
+ public int run(final String[] args) throws Exception {
// Validate command
if (args.length < 1 || args.length > 2) {
System.out.println("Usage:\n");
@@ -155,11 +155,11 @@ public class ConformanceTest extends Configured implements Tool {
System.exit(1);
}
- Set<Value> conformanceTestURIs = new HashSet<>();
+ final Set<Value> conformanceTestURIs = new HashSet<>();
Collection<OwlTest> conformanceTests = new LinkedList<>();
- List<OwlTest> successes = new LinkedList<>();
- List<OwlTest> failures = new LinkedList<>();
- Configuration conf = getConf();
+ final List<OwlTest> successes = new LinkedList<>();
+ final List<OwlTest> failures = new LinkedList<>();
+ final Configuration conf = getConf();
Repository repo;
File workingDir;
@@ -167,13 +167,13 @@ public class ConformanceTest extends Configured implements Tool {
if (args.length == 2) {
workingDir = new File(args[1]);
RDFFormat inputFormat= RDFFormat.RDFXML;
- String formatString = conf.get(MRUtils.FORMAT_PROP);
+ final String formatString = conf.get(MRUtils.FORMAT_PROP);
if (formatString != null) {
inputFormat = RDFFormat.valueOf(formatString);
}
repo = new SailRepository(new MemoryStore());
repo.initialize();
- RepositoryConnection conn = repo.getConnection();
+ final RepositoryConnection conn = repo.getConnection();
conn.add(new FileInputStream(args[0]), "", inputFormat);
conn.close();
}
@@ -185,7 +185,7 @@ public class ConformanceTest extends Configured implements Tool {
}
// Query for the tests we're interested in
- RepositoryConnection conn = repo.getConnection();
+ final RepositoryConnection conn = repo.getConnection();
conformanceTestURIs.addAll(getTestURIs(conn, TEST_INCONSISTENCY));
conformanceTestURIs.addAll(getTestURIs(conn, TEST_CONSISTENCY));
conformanceTestURIs.addAll(getTestURIs(conn, TEST_ENTAILMENT));
@@ -195,9 +195,9 @@ public class ConformanceTest extends Configured implements Tool {
repo.shutDown();
// Set up a MiniAccumulo cluster and set up conf to connect to it
- String username = "root";
- String password = "root";
- MiniAccumuloCluster mini = new MiniAccumuloCluster(workingDir, password);
+ final String username = "root";
+ final String password = "root";
+ final MiniAccumuloCluster mini = new MiniAccumuloCluster(workingDir, password);
mini.start();
conf.set(MRUtils.AC_INSTANCE_PROP, mini.getInstanceName());
conf.set(MRUtils.AC_ZK_PROP, mini.getZooKeepers());
@@ -207,7 +207,7 @@ public class ConformanceTest extends Configured implements Tool {
conf.set(MRUtils.TABLE_PREFIX_PROPERTY, "temp_");
// Run the conformance tests
int result;
- for (OwlTest test : conformanceTests) {
+ for (final OwlTest test : conformanceTests) {
System.out.println(test.uri);
result = runTest(conf, args, test);
if (result != 0) {
@@ -225,14 +225,14 @@ public class ConformanceTest extends Configured implements Tool {
mini.stop();
System.out.println("\n" + successes.size() + " successful tests:");
- for (OwlTest test : successes) {
+ for (final OwlTest test : successes) {
System.out.println("\t[SUCCESS] " + test.type() + " " + test.name);
}
System.out.println("\n" + failures.size() + " failed tests:");
- for (OwlTest test : failures) {
+ for (final OwlTest test : failures) {
System.out.println("\t[FAIL] " + test.type() + " " + test.name);
System.out.println("\t\t(" + test.description + ")");
- for (Statement triple : test.error) {
+ for (final Statement triple : test.error) {
if (test.types.contains(TEST_ENTAILMENT)) {
System.out.println("\t\tExpected: " + triple);
}
@@ -250,23 +250,23 @@ public class ConformanceTest extends Configured implements Tool {
* @param OwlTest Contains premise/conclusion graphs, will store result
* @return Return value of the MapReduce job
*/
- int runTest(Configuration conf, String[] args, OwlTest test)
+ int runTest(final Configuration conf, final String[] args, final OwlTest test)
throws Exception {
conf.setInt(MRReasoningUtils.STEP_PROP, 0);
conf.setInt(MRReasoningUtils.SCHEMA_UPDATE_PROP, 0);
conf.setBoolean(MRReasoningUtils.DEBUG_FLAG, true);
conf.setBoolean(MRReasoningUtils.OUTPUT_FLAG, true);
// Connect to MiniAccumulo and load the test
- Repository repo = MRReasoningUtils.getRepository(conf);
+ final Repository repo = MRReasoningUtils.getRepository(conf);
repo.initialize();
- RepositoryConnection conn = repo.getConnection();
+ final RepositoryConnection conn = repo.getConnection();
conn.clear();
conn.add(new StringReader(test.premise), "", RDFFormat.RDFXML);
conn.close();
repo.shutDown();
// Run the reasoner
- ReasoningDriver reasoner = new ReasoningDriver();
- int result = ToolRunner.run(conf, reasoner, args);
+ final ReasoningDriver reasoner = new ReasoningDriver();
+ final int result = ToolRunner.run(conf, reasoner, args);
test.success = (result == 0);
// Inconsistency test: successful if determined inconsistent
if (test.types.contains(TEST_INCONSISTENCY)) {
@@ -281,21 +281,21 @@ public class ConformanceTest extends Configured implements Tool {
|| test.types.contains(TEST_ENTAILMENT)) {
System.out.println("Reading inferred triples...");
// Read in the inferred triples from HDFS:
- Schema schema = MRReasoningUtils.loadSchema(conf);
- FileSystem fs = FileSystem.get(conf);
- Path path = MRReasoningUtils.getOutputPath(conf, "final");
- OutputCollector inferred = new OutputCollector();
- NTriplesParser parser = new NTriplesParser();
+ final Schema schema = MRReasoningUtils.loadSchema(conf);
+ final FileSystem fs = FileSystem.get(conf);
+ final Path path = MRReasoningUtils.getOutputPath(conf, "final");
+ final OutputCollector inferred = new OutputCollector();
+ final NTriplesParser parser = new NTriplesParser();
parser.setRDFHandler(inferred);
if (fs.isDirectory(path)) {
- for (FileStatus status : fs.listStatus(path)) {
- String s = status.getPath().getName();
+ for (final FileStatus status : fs.listStatus(path)) {
+ final String s = status.getPath().getName();
if (s.startsWith(MRReasoningUtils.INCONSISTENT_OUT)
|| s.startsWith(MRReasoningUtils.DEBUG_OUT)) {
continue;
}
- BufferedReader br = new BufferedReader(
- new InputStreamReader(fs.open(status.getPath())));
+ final BufferedReader br = new BufferedReader(
+ new InputStreamReader(fs.open(status.getPath()), StandardCharsets.UTF_8));
parser.parse(br, "");
br.close();
}
@@ -306,8 +306,8 @@ public class ConformanceTest extends Configured implements Tool {
if (test.types.contains(TEST_ENTAILMENT)) {
// Check expected inferences against the inferred triples and
// the schema reasoner
- for (Statement st : test.expected) {
- Fact fact = new Fact(st);
+ for (final Statement st : test.expected) {
+ final Fact fact = new Fact(st);
if (!test.inferred.contains(st)
&& !triviallyTrue(fact.getTriple(), schema)
&& !schema.containsTriple(fact.getTriple())) {
@@ -317,8 +317,8 @@ public class ConformanceTest extends Configured implements Tool {
}
// Non-entailment test: failure if non-expected triples inferred
if (test.types.contains(TEST_NONENTAILMENT)) {
- for (Statement st : test.unexpected) {
- Fact fact = new Fact(st);
+ for (final Statement st : test.unexpected) {
+ final Fact fact = new Fact(st);
if (test.inferred.contains(st)
|| schema.containsTriple(fact.getTriple())) {
test.error.add(st);
@@ -336,18 +336,18 @@ public class ConformanceTest extends Configured implements Tool {
* Query a connection for conformance tests matching a particular
* test type.
*/
- Set<Value> getTestURIs(RepositoryConnection conn, String testType)
+ Set<Value> getTestURIs(final RepositoryConnection conn, final String testType)
throws IOException, OpenRDFException {
- Set<Value> testURIs = new HashSet<>();
- TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL,
+ final Set<Value> testURIs = new HashSet<>();
+ final TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL,
"select ?test where { " +
"?test <" + TYPE + "> <" + testType + "> .\n" +
"?test <" + TEST_PROFILE + "> <" + TEST_RL + "> .\n" +
"?test <" + TEST_SEMANTICS + "> <" + TEST_RDFBASED + "> .\n" +
"}");
- TupleQueryResult queryResult = query.evaluate();
+ final TupleQueryResult queryResult = query.evaluate();
while (queryResult.hasNext()) {
- BindingSet bindings = queryResult.next();
+ final BindingSet bindings = queryResult.next();
testURIs.add(bindings.getValue("test"));
}
queryResult.close();
@@ -357,10 +357,10 @@ public class ConformanceTest extends Configured implements Tool {
/**
* Query a connection for conformance test details.
*/
- Collection<OwlTest> getTests(RepositoryConnection conn, Set<Value> testURIs)
+ Collection<OwlTest> getTests(final RepositoryConnection conn, final Set<Value> testURIs)
throws IOException, OpenRDFException {
- Map<Value, OwlTest> tests = new HashMap<>();
- TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL,
+ final Map<Value, OwlTest> tests = new HashMap<>();
+ final TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL,
"select * where { " +
"?test <" + TYPE + "> ?testType .\n" +
"?test <" + TEST_PREMISE + "> ?graph .\n" +
@@ -371,10 +371,10 @@ public class ConformanceTest extends Configured implements Tool {
"OPTIONAL {?test <" + TEST_CONCLUSION + "> ?conclusion .}\n" +
"OPTIONAL {?test <" + TEST_NONCONCLUSION + "> ?nonentailed .}\n" +
"}");
- TupleQueryResult queryResult = query.evaluate();
+ final TupleQueryResult queryResult = query.evaluate();
while (queryResult.hasNext()) {
- BindingSet bindings = queryResult.next();
- Value uri = bindings.getValue("test");
+ final BindingSet bindings = queryResult.next();
+ final Value uri = bindings.getValue("test");
if (testURIs.contains(uri)) {
OwlTest test;
if (tests.containsKey(uri)) {
@@ -397,9 +397,9 @@ public class ConformanceTest extends Configured implements Tool {
test.types.add(bindings.getValue("testType").stringValue());
}
}
- for (OwlTest test : tests.values()) {
+ for (final OwlTest test : tests.values()) {
if (test.compareTo != null) {
- RDFXMLParser parser = new RDFXMLParser();
+ final RDFXMLParser parser = new RDFXMLParser();
parser.setRDFHandler(test);
parser.parse(new StringReader(test.compareTo), "");
}
@@ -413,10 +413,10 @@ public class ConformanceTest extends Configured implements Tool {
* tests, such as an implicit "[bnode] type Ontology" triple or a
* "[class] type Class" triple as long as the class exists.
*/
- boolean triviallyTrue(Statement triple, Schema schema) {
- Resource s = triple.getSubject();
- URI p = triple.getPredicate();
- Value o = triple.getObject();
+ boolean triviallyTrue(final Statement triple, final Schema schema) {
+ final Resource s = triple.getSubject();
+ final URI p = triple.getPredicate();
+ final Value o = triple.getObject();
if (p.equals(RDF.TYPE)) {
if (o.equals(OWL.ONTOLOGY)) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
index 489fd34..eac06c6 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
@@ -20,19 +20,12 @@ package org.apache.rya.accumulo.mr;
*/
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map.Entry;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.resolver.RyaTripleContext;
-import org.apache.rya.api.resolver.triple.TripleRow;
-import org.apache.rya.api.resolver.triple.TripleRowResolverException;
-
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
@@ -43,6 +36,13 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RyaTripleContext;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
import org.apache.spark.graphx.Edge;
/**
@@ -63,8 +63,8 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
* @return A RecordReader that can be used to fetch RyaStatementWritables.
*/
@Override
- public RecordReader<Object, Edge> createRecordReader(InputSplit split,
- TaskAttemptContext context) {
+ public RecordReader<Object, Edge> createRecordReader(final InputSplit split,
+ final TaskAttemptContext context) {
return new RyaStatementRecordReader();
}
@@ -77,7 +77,7 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
* Statements will be read from the Rya table associated with
* this layout.
*/
- public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
+ public static void setTableLayout(final Job conf, final TABLE_LAYOUT layout) {
conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name());
}
@@ -89,8 +89,8 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
private RyaTripleContext ryaContext;
private TABLE_LAYOUT tableLayout;
- protected void setupIterators(TaskAttemptContext context,
- Scanner scanner, String tableName, RangeInputSplit split) {
+ protected void setupIterators(final TaskAttemptContext context,
+ final Scanner scanner, final String tableName, final RangeInputSplit split) {
}
/**
@@ -104,7 +104,7 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
* if thrown by the superclass's initialize method.
*/
@Override
- public void initialize(InputSplit inSplit, TaskAttemptContext attempt)
+ public void initialize(final InputSplit inSplit, final TaskAttemptContext attempt)
throws IOException {
super.initialize(inSplit, attempt);
this.tableLayout = MRUtils.getTableLayout(
@@ -127,15 +127,16 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
*/
@Override
public boolean nextKeyValue() throws IOException {
- if (!scannerIterator.hasNext())
- return false;
- Entry<Key, Value> entry = scannerIterator.next();
+ if (!scannerIterator.hasNext()) {
+ return false;
+ }
+ final Entry<Key, Value> entry = scannerIterator.next();
++numKeysRead;
currentKey = entry.getKey();
try {
currentK = currentKey.getRow();
- RyaTypeWritable rtw = new RyaTypeWritable();
- RyaStatement stmt = this.ryaContext.deserializeTriple(
+ final RyaTypeWritable rtw = new RyaTypeWritable();
+ final RyaStatement stmt = this.ryaContext.deserializeTriple(
this.tableLayout, new TripleRow(entry.getKey().getRow()
.getBytes(), entry.getKey().getColumnFamily()
.getBytes(), entry.getKey()
@@ -144,28 +145,28 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
.getColumnVisibility().getBytes(), entry
.getValue().get()));
- long subHash = getVertexId(stmt.getSubject());
- long objHash = getVertexId(stmt.getObject());
+ final long subHash = getVertexId(stmt.getSubject());
+ final long objHash = getVertexId(stmt.getObject());
rtw.setRyaType(stmt.getPredicate());
- Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>(
+ final Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>(
subHash, objHash, rtw);
currentV = writable;
- } catch (TripleRowResolverException e) {
+ } catch (final TripleRowResolverException e) {
throw new IOException(e);
}
return true;
}
protected List<IteratorSetting> contextIterators(
- TaskAttemptContext context, String tableName) {
+ final TaskAttemptContext context, final String tableName) {
return getIterators(context);
}
@Override
- protected void setupIterators(TaskAttemptContext context,
- Scanner scanner, String tableName,
- org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
+ protected void setupIterators(final TaskAttemptContext context,
+ final Scanner scanner, final String tableName,
+ final org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
List<IteratorSetting> iterators = null;
if (null == split) {
@@ -177,13 +178,14 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
}
}
- for (IteratorSetting iterator : iterators)
- scanner.addScanIterator(iterator);
+ for (final IteratorSetting iterator : iterators) {
+ scanner.addScanIterator(iterator);
+ }
}
}
- public static long getVertexId(RyaType resource) throws IOException {
+ public static long getVertexId(final RyaType resource) throws IOException {
String uri = "";
if (resource != null) {
uri = resource.getData().toString();
@@ -193,20 +195,20 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
// the digested string, the collision ratio is less than 0.0001%
// using custom hash function should significantly reduce the
// collision ratio
- MessageDigest messageDigest = MessageDigest
+ final MessageDigest messageDigest = MessageDigest
.getInstance("SHA-256");
- messageDigest.update(uri.getBytes());
- String encryptedString = new String(messageDigest.digest());
+ messageDigest.update(uri.getBytes(StandardCharsets.UTF_8));
+ final String encryptedString = new String(messageDigest.digest(), StandardCharsets.UTF_8);
return hash(encryptedString);
}
- catch (NoSuchAlgorithmException e) {
+ catch (final NoSuchAlgorithmException e) {
throw new IOException(e);
}
}
- public static long hash(String string) {
+ public static long hash(final String string) {
long h = 1125899906842597L; // prime
- int len = string.length();
+ final int len = string.length();
for (int i = 0; i < len; i++) {
h = 31 * h + string.charAt(i);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
index 5332260..0d42df2 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
@@ -22,6 +22,7 @@ package org.apache.rya.accumulo.mr;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
@@ -94,7 +95,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param job Job to apply the setting to.
* @param visibility A comma-separated list of authorizations.
*/
- public static void setDefaultVisibility(Job job, String visibility) {
+ public static void setDefaultVisibility(final Job job, final String visibility) {
if (visibility != null) {
job.getConfiguration().set(CV_PROPERTY, visibility);
}
@@ -107,7 +108,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param job Job to apply the setting to.
* @param context A context string, should be a syntactically valid URI.
*/
- public static void setDefaultContext(Job job, String context) {
+ public static void setDefaultContext(final Job job, final String context) {
if (context != null) {
job.getConfiguration().set(CONTEXT_PROPERTY, context);
}
@@ -118,7 +119,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param job Job to apply the setting to.
* @param prefix The common prefix to all rya tables that output will be written to.
*/
- public static void setTablePrefix(Job job, String prefix) {
+ public static void setTablePrefix(final Job job, final String prefix) {
job.getConfiguration().set(OUTPUT_PREFIX_PROPERTY, prefix);
}
@@ -127,7 +128,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param job Job to apply the setting to.
* @param enable Whether this job should add its output statements to the free text index.
*/
- public static void setFreeTextEnabled(Job job, boolean enable) {
+ public static void setFreeTextEnabled(final Job job, final boolean enable) {
job.getConfiguration().setBoolean(ENABLE_FREETEXT, enable);
}
@@ -136,7 +137,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param job Job to apply the setting to.
* @param enable Whether this job should add its output statements to the temporal index.
*/
- public static void setTemporalEnabled(Job job, boolean enable) {
+ public static void setTemporalEnabled(final Job job, final boolean enable) {
job.getConfiguration().setBoolean(ENABLE_TEMPORAL, enable);
}
@@ -145,7 +146,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param job Job to apply the setting to.
* @param enable Whether this job should add its output statements to the entity-centric index.
*/
- public static void setEntityEnabled(Job job, boolean enable) {
+ public static void setEntityEnabled(final Job job, final boolean enable) {
job.getConfiguration().setBoolean(ENABLE_ENTITY, enable);
}
@@ -154,7 +155,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param job Job to apply the setting to.
* @param enable Whether this job should output to the core tables.
*/
- public static void setCoreTablesEnabled(Job job, boolean enable) {
+ public static void setCoreTablesEnabled(final Job job, final boolean enable) {
job.getConfiguration().setBoolean(ENABLE_CORE, enable);
}
@@ -163,7 +164,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param job Job to configure
* @param instance Name of the mock instance
*/
- public static void setMockInstance(Job job, String instance) {
+ public static void setMockInstance(final Job job, final String instance) {
AccumuloOutputFormat.setMockInstance(job, instance);
job.getConfiguration().setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true);
job.getConfiguration().setBoolean(MRUtils.AC_MOCK_PROP, true);
@@ -175,8 +176,8 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @throws IOException if initializing the core Rya indexer fails.
*/
@Override
- public void checkOutputSpecs(JobContext jobContext) throws IOException {
- Configuration conf = jobContext.getConfiguration();
+ public void checkOutputSpecs(final JobContext jobContext) throws IOException {
+ final Configuration conf = jobContext.getConfiguration();
// make sure that all of the indexers can connect
getFreeTextIndexer(conf);
getTemporalIndexer(conf);
@@ -189,7 +190,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @return A committer whose method implementations are empty.
*/
@Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException {
// copied from AccumuloOutputFormat
return new NullOutputFormat<Text, Mutation>().getOutputCommitter(context);
}
@@ -201,16 +202,16 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @throws IOException if any enabled indexers can't be initialized
*/
@Override
- public RecordWriter<Writable, RyaStatementWritable> getRecordWriter(TaskAttemptContext context) throws IOException {
+ public RecordWriter<Writable, RyaStatementWritable> getRecordWriter(final TaskAttemptContext context) throws IOException {
return new RyaRecordWriter(context);
}
- private static FreeTextIndexer getFreeTextIndexer(Configuration conf) throws IOException {
+ private static FreeTextIndexer getFreeTextIndexer(final Configuration conf) throws IOException {
if (!conf.getBoolean(ENABLE_FREETEXT, true)) {
return null;
}
- AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer();
+ final AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer();
freeText.setConf(conf);
Connector connector;
try {
@@ -218,7 +219,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IOException("Error when attempting to create a connection for writing the freeText index.", e);
}
- MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
+ final MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
freeText.setConnector(connector);
freeText.setMultiTableBatchWriter(mtbw);
freeText.init();
@@ -226,11 +227,11 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
return freeText;
}
- private static TemporalIndexer getTemporalIndexer(Configuration conf) throws IOException {
+ private static TemporalIndexer getTemporalIndexer(final Configuration conf) throws IOException {
if (!conf.getBoolean(ENABLE_TEMPORAL, true)) {
return null;
}
- AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
+ final AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
temporal.setConf(conf);
Connector connector;
try {
@@ -238,34 +239,34 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IOException("Error when attempting to create a connection for writing the temporal index.", e);
}
- MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
+ final MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig());
temporal.setConnector(connector);
temporal.setMultiTableBatchWriter(mtbw);
temporal.init();
return temporal;
}
- private static EntityCentricIndex getEntityIndexer(Configuration conf) {
+ private static EntityCentricIndex getEntityIndexer(final Configuration conf) {
if (!conf.getBoolean(ENABLE_ENTITY, true)) {
return null;
}
- EntityCentricIndex entity = new EntityCentricIndex();
+ final EntityCentricIndex entity = new EntityCentricIndex();
entity.setConf(conf);
return entity;
}
- private static AccumuloRyaDAO getRyaIndexer(Configuration conf) throws IOException {
+ private static AccumuloRyaDAO getRyaIndexer(final Configuration conf) throws IOException {
try {
if (!conf.getBoolean(ENABLE_CORE, true)) {
return null;
}
- AccumuloRyaDAO ryaIndexer = new AccumuloRyaDAO();
- Connector conn = ConfigUtils.getConnector(conf);
+ final AccumuloRyaDAO ryaIndexer = new AccumuloRyaDAO();
+ final Connector conn = ConfigUtils.getConnector(conf);
ryaIndexer.setConnector(conn);
- AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+ final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
- String tablePrefix = conf.get(OUTPUT_PREFIX_PROPERTY, null);
+ final String tablePrefix = conf.get(OUTPUT_PREFIX_PROPERTY, null);
if (tablePrefix != null) {
ryaConf.setTablePrefix(tablePrefix);
}
@@ -273,13 +274,13 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
ryaIndexer.setConf(ryaConf);
ryaIndexer.init();
return ryaIndexer;
- } catch (AccumuloException e) {
+ } catch (final AccumuloException e) {
logger.error("Cannot create RyaIndexer", e);
throw new IOException(e);
- } catch (AccumuloSecurityException e) {
+ } catch (final AccumuloSecurityException e) {
logger.error("Cannot create RyaIndexer", e);
throw new IOException(e);
- } catch (RyaDAOException e) {
+ } catch (final RyaDAOException e) {
logger.error("Cannot create RyaIndexer", e);
throw new IOException(e);
}
@@ -293,11 +294,11 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
implements Closeable, Flushable {
private static final Logger logger = Logger.getLogger(RyaRecordWriter.class);
- private FreeTextIndexer freeTextIndexer;
- private TemporalIndexer temporalIndexer;
- private EntityCentricIndex entityIndexer;
- private AccumuloRyaDAO ryaIndexer;
- private RyaTripleContext tripleContext;
+ private final FreeTextIndexer freeTextIndexer;
+ private final TemporalIndexer temporalIndexer;
+ private final EntityCentricIndex entityIndexer;
+ private final AccumuloRyaDAO ryaIndexer;
+ private final RyaTripleContext tripleContext;
private MultiTableBatchWriter writer;
private byte[] cv = AccumuloRdfConstants.EMPTY_CV.getExpression();
private RyaURI defaultContext = null;
@@ -305,10 +306,10 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
private static final long ONE_MEGABYTE = 1024L * 1024L;
private static final long AVE_STATEMENT_SIZE = 100L;
- private long bufferSizeLimit;
+ private final long bufferSizeLimit;
private long bufferCurrentSize = 0;
- private ArrayList<RyaStatement> buffer;
+ private final ArrayList<RyaStatement> buffer;
/**
* Constructor.
@@ -316,7 +317,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @throws IOException if the core Rya indexer or entity indexer can't
* be initialized
*/
- public RyaRecordWriter(TaskAttemptContext context) throws IOException {
+ public RyaRecordWriter(final TaskAttemptContext context) throws IOException {
this(context.getConfiguration());
}
@@ -326,21 +327,21 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @throws IOException if the core Rya indexer or entity indexer can't
* be initialized
*/
- public RyaRecordWriter(Configuration conf) throws IOException {
+ public RyaRecordWriter(final Configuration conf) throws IOException {
// set the visibility
- String visibility = conf.get(CV_PROPERTY);
+ final String visibility = conf.get(CV_PROPERTY);
if (visibility != null) {
- cv = visibility.getBytes();
+ cv = visibility.getBytes(StandardCharsets.UTF_8);
}
// set the default context
- String context = conf.get(CONTEXT_PROPERTY, "");
+ final String context = conf.get(CONTEXT_PROPERTY, "");
if (context != null && !context.isEmpty()) {
defaultContext = new RyaURI(context);
}
// set up the buffer
bufferSizeLimit = conf.getLong(MAX_MUTATION_BUFFER_SIZE, ONE_MEGABYTE);
- int bufferCapacity = (int) (bufferSizeLimit / AVE_STATEMENT_SIZE);
+ final int bufferCapacity = (int) (bufferSizeLimit / AVE_STATEMENT_SIZE);
buffer = new ArrayList<RyaStatement>(bufferCapacity);
// set up the indexers
@@ -358,7 +359,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IOException("Error connecting to Accumulo for entity index output", e);
}
- BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
+ final BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
batchWriterConfig.setMaxMemory(RdfCloudTripleStoreConstants.MAX_MEMORY);
batchWriterConfig.setTimeout(RdfCloudTripleStoreConstants.MAX_TIME, TimeUnit.MILLISECONDS);
batchWriterConfig.setMaxWriteThreads(RdfCloudTripleStoreConstants.NUM_THREADS);
@@ -396,41 +397,45 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param paramTaskAttemptContext Unused.
*/
@Override
- public void close(TaskAttemptContext paramTaskAttemptContext) {
+ public void close(final TaskAttemptContext paramTaskAttemptContext) {
// close everything. log errors
try {
flush();
- } catch (IOException e) {
+ } catch (final IOException e) {
logger.error("Error flushing the buffer on RyaOutputFormat Close", e);
}
try {
- if (freeTextIndexer != null)
+ if (freeTextIndexer != null) {
freeTextIndexer.close();
- } catch (IOException e) {
+ }
+ } catch (final IOException e) {
logger.error("Error closing the freetextIndexer on RyaOutputFormat Close", e);
}
try {
- if (temporalIndexer != null)
+ if (temporalIndexer != null) {
temporalIndexer.close();
- } catch (IOException e) {
+ }
+ } catch (final IOException e) {
logger.error("Error closing the temporalIndexer on RyaOutputFormat Close", e);
}
try {
- if (entityIndexer != null)
+ if (entityIndexer != null) {
entityIndexer.close();
- } catch (IOException e) {
+ }
+ } catch (final IOException e) {
logger.error("Error closing the entityIndexer on RyaOutputFormat Close", e);
}
try {
- if (ryaIndexer != null)
+ if (ryaIndexer != null) {
ryaIndexer.destroy();
- } catch (RyaDAOException e) {
+ }
+ } catch (final RyaDAOException e) {
logger.error("Error closing RyaDAO on RyaOutputFormat Close", e);
}
if (writer != null) {
try {
writer.close();
- } catch (MutationsRejectedException e) {
+ } catch (final MutationsRejectedException e) {
logger.error("Error closing MultiTableBatchWriter on RyaOutputFormat Close", e);
}
}
@@ -442,7 +447,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param statement Statement to insert to Rya.
* @throws IOException if writing to Accumulo fails.
*/
- public void write(Statement statement) throws IOException {
+ public void write(final Statement statement) throws IOException {
write(RdfToRyaConversions.convertStatement(statement));
}
@@ -452,7 +457,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @param ryaStatement Statement to insert to Rya.
* @throws IOException if writing to Accumulo fails.
*/
- public void write(RyaStatement ryaStatement) throws IOException {
+ public void write(final RyaStatement ryaStatement) throws IOException {
write(NullWritable.get(), new RyaStatementWritable(ryaStatement, tripleContext));
}
@@ -464,8 +469,8 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
* @throws IOException if writing to Accumulo fails.
*/
@Override
- public void write(Writable key, RyaStatementWritable value) throws IOException {
- RyaStatement ryaStatement = value.getRyaStatement();
+ public void write(final Writable key, final RyaStatementWritable value) throws IOException {
+ final RyaStatement ryaStatement = value.getRyaStatement();
if (ryaStatement.getColumnVisibility() == null) {
ryaStatement.setColumnVisibility(cv);
}
@@ -479,11 +484,11 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
}
}
- private int statementSize(RyaStatement ryaStatement) {
- RyaURI subject = ryaStatement.getSubject();
- RyaURI predicate = ryaStatement.getPredicate();
- RyaType object = ryaStatement.getObject();
- RyaURI context = ryaStatement.getContext();
+ private int statementSize(final RyaStatement ryaStatement) {
+ final RyaURI subject = ryaStatement.getSubject();
+ final RyaURI predicate = ryaStatement.getPredicate();
+ final RyaType object = ryaStatement.getObject();
+ final RyaURI context = ryaStatement.getContext();
int size = 3 + subject.getData().length() + predicate.getData().length() + object.getData().length();
if (!XMLSchema.ANYURI.equals(object.getDataType())) {
size += 2 + object.getDataType().toString().length();
@@ -508,15 +513,15 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
totalCommitRecords += buffer.size();
commitCount++;
- long startCommitTime = System.currentTimeMillis();
+ final long startCommitTime = System.currentTimeMillis();
logger.info(String.format("(C-%d) Flushing buffer with %,d objects and %,d bytes", commitCount, buffer.size(),
bufferCurrentSize));
- double readingDuration = (startCommitTime - lastCommitFinishTime) / 1000.;
+ final double readingDuration = (startCommitTime - lastCommitFinishTime) / 1000.;
totalReadDuration += readingDuration;
- double currentReadRate = buffer.size() / readingDuration;
- double totalReadRate = totalCommitRecords / totalReadDuration;
+ final double currentReadRate = buffer.size() / readingDuration;
+ final double totalReadRate = totalCommitRecords / totalReadDuration;
// Print "reading" metrics
logger.info(String.format("(C-%d) (Reading) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, readingDuration,
@@ -539,7 +544,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
entityIndexer.storeStatements(buffer);
try {
writer.flush();
- } catch (MutationsRejectedException e) {
+ } catch (final MutationsRejectedException e) {
throw new IOException("Error flushing data to Accumulo for entity indexing", e);
}
}
@@ -549,26 +554,26 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable
if (ryaIndexer != null) {
ryaIndexer.add(buffer.iterator());
}
- } catch (RyaDAOException e) {
+ } catch (final RyaDAOException e) {
logger.error("Cannot write statement to Rya", e);
throw new IOException(e);
}
lastCommitFinishTime = System.currentTimeMillis();
- double writingDuration = (lastCommitFinishTime - startCommitTime) / 1000.;
+ final double writingDuration = (lastCommitFinishTime - startCommitTime) / 1000.;
totalWriteDuration += writingDuration;
- double currentWriteRate = buffer.size() / writingDuration;
- double totalWriteRate = totalCommitRecords / totalWriteDuration;
+ final double currentWriteRate = buffer.size() / writingDuration;
+ final double totalWriteRate = totalCommitRecords / totalWriteDuration;
// Print "writing" stats
logger.info(String.format("(C-%d) (Writing) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, writingDuration,
currentWriteRate, totalWriteRate));
- double processDuration = writingDuration + readingDuration;
- double totalProcessDuration = totalWriteDuration + totalReadDuration;
- double currentProcessRate = buffer.size() / processDuration;
- double totalProcessRate = totalCommitRecords / (totalProcessDuration);
+ final double processDuration = writingDuration + readingDuration;
+ final double totalProcessDuration = totalWriteDuration + totalReadDuration;
+ final double currentProcessRate = buffer.size() / processDuration;
+ final double totalProcessRate = totalCommitRecords / (totalProcessDuration);
// Print "total" stats
logger.info(String.format("(C-%d) (Total) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, processDuration,