You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pirk.apache.org by ea...@apache.org on 2016/09/02 19:06:04 UTC
incubator-pirk git commit: Fix deprecated Api calls and other minor
fixes - closes apache/incubator-pirk#84
Repository: incubator-pirk
Updated Branches:
refs/heads/master 45b2da870 -> 5cc5e7c24
Fix deprecated Api calls and other minor fixes - closes apache/incubator-pirk#84
Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/5cc5e7c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/5cc5e7c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/5cc5e7c2
Branch: refs/heads/master
Commit: 5cc5e7c24728695d4c2cdc8bd88555c52e60887c
Parents: 45b2da8
Author: smarthi <sm...@apache.org>
Authored: Fri Sep 2 15:06:00 2016 -0400
Committer: eawilliams <ea...@apache.org>
Committed: Fri Sep 2 15:06:00 2016 -0400
----------------------------------------------------------------------
pom.xml | 2 +-
.../responder/wideskies/ResponderDriver.java | 4 +-
.../responder/wideskies/ResponderProps.java | 70 ++++++++++---------
.../wideskies/common/ComputeEncryptedRow.java | 10 +--
.../mapreduce/ComputeResponseTool.java | 12 ++--
.../wideskies/spark/ComputeResponse.java | 14 ++--
.../spark/EncColMultGroupedMapper.java | 2 +-
.../wideskies/spark/EncColMultReducer.java | 2 +-
.../wideskies/spark/ExpKeyFilenameMap.java | 3 +-
.../responder/wideskies/spark/FilterData.java | 2 +-
.../streaming/ComputeStreamingResponse.java | 71 +++++++++-----------
.../wideskies/storm/EncColMultBolt.java | 18 +++--
.../wideskies/storm/EncRowCalcBolt.java | 26 +++----
.../responder/wideskies/storm/OutputBolt.java | 42 ++++++------
.../wideskies/storm/PartitionDataBolt.java | 13 +---
.../wideskies/storm/PirkHashScheme.java | 15 ++---
.../responder/wideskies/storm/PirkTopology.java | 10 +--
.../wideskies/storm/StormConstants.java | 60 ++++++++---------
.../responder/wideskies/storm/StormUtils.java | 18 ++---
.../pirk/schema/data/DataSchemaLoader.java | 2 -
.../org/apache/pirk/test/utils/TestUtils.java | 8 +--
.../pirk/storm/KafkaStormIntegrationTest.java | 50 +++++++-------
22 files changed, 212 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 51e16b9..3119282 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,7 +88,7 @@
<hadoop.version>2.7.2</hadoop.version>
<spark.version>1.6.1</spark.version>
<apache-commons.version>3.3</apache-commons.version>
- <elasticsearch.version>2.3.3</elasticsearch.version>
+ <elasticsearch.version>2.3.4</elasticsearch.version>
<storm.version>1.0.1</storm.version>
<kafka.version>0.9.0.1</kafka.version>
<spark-streaming.version>2.0.0</spark-streaming.version>
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index f8e396b..6f34de5 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -50,9 +50,9 @@ public class ResponderDriver
{
private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class);
- enum Platform
+ private enum Platform
{
- MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE;
+ MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE
}
public static void main(String[] args) throws Exception
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index 55846fd..f73fdbe 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -21,9 +21,7 @@ package org.apache.pirk.responder.wideskies;
import java.util.Arrays;
import java.util.List;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.cli.Option;
import org.apache.pirk.inputformat.hadoop.InputFormatConst;
import org.apache.pirk.schema.data.DataSchemaLoader;
import org.apache.pirk.schema.query.QuerySchemaLoader;
@@ -62,14 +60,14 @@ public class ResponderProps
public static final String NUMCOLMULTPARTITIONS = "pir.numColMultPartitions";
public static final String USEMODEXPJOIN = "pir.useModExpJoin";
public static final String COLMULTREDUCEBYKEY = "pir.colMultReduceByKey";
- public static final String NUMREDUCETASKS = "pir.numReduceTasks";
- public static final String MAPMEMORY = "mapreduce.map.memory.mb";
- public static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
- public static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
- public static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
- public static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
+ static final String NUMREDUCETASKS = "pir.numReduceTasks";
+ static final String MAPMEMORY = "mapreduce.map.memory.mb";
+ static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
+ static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
+ static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
+ static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
public static final String NUMDATAPARTITIONS = "pir.numDataPartitions";
- public static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
+ static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
// For Spark Streaming - optional
public static final String BATCHSECONDS = "pir.sparkstreaming.batchSeconds";
@@ -80,35 +78,35 @@ public class ResponderProps
// Storm parameters
// hdfs
- static final String HDFSURI = "hdfs.uri";
- static final String USEHDFS = "hdfs.use";
+ private static final String HDFSURI = "hdfs.uri";
+ private static final String USEHDFS = "hdfs.use";
// kafka
- static final String KAFKATOPIC = "kafka.topic";
- static final String KAFKACLIENTID = "kafka.clientId";
- static final String KAFKAZK = "kafka.zk";
- static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart";
+ private static final String KAFKATOPIC = "kafka.topic";
+ private static final String KAFKACLIENTID = "kafka.clientId";
+ private static final String KAFKAZK = "kafka.zk";
+ private static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart";
// pirk topo
- static final String STORMTOPONAME = "storm.topoName";
- static final String STORMWORKERS = "storm.workers";
- static final String STORMNUMACKERS = "storm.numAckers";
- static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize";
- static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize";
- static final String STORMTRANSFERBUFFERS = "storm.executor.transferBufferSize";
- static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending";
- static final String STORMHEAPMEMORY = "storm.worker.heapMemory";
- static final String STORMCHILDOPTS = "storm.worker.childOpts";
- static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory";
- static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem";
- static final String STORMSPOUTPAR = "storm.spout.parallelism";
- static final String STORMPARTITIONDATABOLTPAR = "storm.partitiondata.parallelism";
- static final String STORMENCROWCALCBOLTPAR = "storm.encrowcalcbolt.parallelism";
- static final String STORMENCCOLMULTBOLTPAR = "storm.enccolmultbolt.parallelism";
- static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple";
- static final String STORMSPLITPARTITIONS = "storm.splitPartitions";
- static final String STORMSALTCOLUMNS = "storm.saltColumns";
- static final String STORMNUMROWDIVS = "storm.rowDivs";
-
- static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS,
+ private static final String STORMTOPONAME = "storm.topoName";
+ private static final String STORMWORKERS = "storm.workers";
+ private static final String STORMNUMACKERS = "storm.numAckers";
+ private static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize";
+ private static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize";
+ private static final String STORMTRANSFERBUFFERS = "storm.executor.transferBufferSize";
+ private static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending";
+ private static final String STORMHEAPMEMORY = "storm.worker.heapMemory";
+ private static final String STORMCHILDOPTS = "storm.worker.childOpts";
+ private static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory";
+ private static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem";
+ private static final String STORMSPOUTPAR = "storm.spout.parallelism";
+ private static final String STORMPARTITIONDATABOLTPAR = "storm.partitiondata.parallelism";
+ private static final String STORMENCROWCALCBOLTPAR = "storm.encrowcalcbolt.parallelism";
+ private static final String STORMENCCOLMULTBOLTPAR = "storm.enccolmultbolt.parallelism";
+ private static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple";
+ private static final String STORMSPLITPARTITIONS = "storm.splitPartitions";
+ private static final String STORMSALTCOLUMNS = "storm.saltColumns";
+ private static final String STORMNUMROWDIVS = "storm.rowDivs";
+
+ private static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS,
STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, STORMMAXWORKERHEAP,
STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY,
STORMSPLITPARTITIONS, STORMSALTCOLUMNS, STORMNUMROWDIVS};
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
index 0065094..f63cd08 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
@@ -345,10 +345,10 @@ public class ComputeEncryptedRow
{
BigInteger part = dataPartitions.get(i);
- BigInteger exp = null;
+ BigInteger exp;
try
{
- exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+ exp = expCache.get(new Tuple3<>(rowQuery, part, query.getNSquared()));
} catch (ExecutionException e)
{
e.printStackTrace();
@@ -358,7 +358,7 @@ public class ComputeEncryptedRow
logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}", rowIndex, colCounter, part.toString(),
part.toString(2), exp, i, dataPartitions.get(i), dataPartitions.get(i).toString(2));
- returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+ returnPairs.add(new Tuple2<>(colCounter, exp));
++colCounter;
}
@@ -380,13 +380,13 @@ public class ComputeEncryptedRow
BigInteger exp = null;
try
{
- exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+ exp = expCache.get(new Tuple3<>(rowQuery, part, query.getNSquared()));
} catch (ExecutionException e)
{
e.printStackTrace();
}
- returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+ returnPairs.add(new Tuple2<>(colCounter, exp));
++colCounter;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
index 1d06f86..ab41a47 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
@@ -110,8 +110,6 @@ public class ComputeResponseTool extends Configured implements Tool
private String esQuery = "none";
private String esResource = "none";
- String dataSchema = "none";
-
private Configuration conf = null;
private FileSystem fs = null;
@@ -264,7 +262,7 @@ public class ComputeResponseTool extends Configured implements Tool
// Run the job to generate the expTable
// Job jobExp = new Job(mrConfig.getConfig(), "pirExp-" + pirWL.getWatchlistNum());
- Job jobExp = new Job(conf, "pirExp-" + queryInfo.getIdentifier());
+ Job jobExp = Job.getInstance(conf, "pirExp-" + queryInfo.getIdentifier());
jobExp.setSpeculativeExecution(false);
jobExp.getConfiguration().set("mapreduce.map.speculative", "false");
@@ -312,7 +310,7 @@ public class ComputeResponseTool extends Configured implements Tool
// Assemble the exp table from the output
// element_index -> fileName
- HashMap<Integer,String> expFileTable = new HashMap<>();
+ Map<Integer,String> expFileTable = new HashMap<>();
FileStatus[] status = fs.listStatus(outPathExp);
for (FileStatus fstat : status)
{
@@ -352,7 +350,7 @@ public class ComputeResponseTool extends Configured implements Tool
{
boolean success;
- Job job = new Job(conf, "pirMR");
+ Job job = Job.getInstance(conf, "pirMR");
job.setSpeculativeExecution(false);
// Set the data and query schema properties
@@ -445,7 +443,7 @@ public class ComputeResponseTool extends Configured implements Tool
{
boolean success;
- Job columnMultJob = new Job(conf, "pir_columnMult");
+ Job columnMultJob = Job.getInstance(conf, "pir_columnMult");
columnMultJob.setSpeculativeExecution(false);
String columnMultJobName = "pir_columnMult";
@@ -505,7 +503,7 @@ public class ComputeResponseTool extends Configured implements Tool
{
boolean success;
- Job finalResponseJob = new Job(conf, "pir_finalResponse");
+ Job finalResponseJob = Job.getInstance(conf, "pir_finalResponse");
finalResponseJob.setSpeculativeExecution(false);
String finalResponseJobName = "pir_finalResponse";
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index f2a54d7..d6593f7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -249,7 +249,7 @@ public class ComputeResponse
JavaRDD<MapWritable> jsonRDD;
- Job job = new Job();
+ Job job = Job.getInstance();
String baseQuery = SystemConfiguration.getProperty("pir.baseQuery");
String jobName = "pirSpark_base_" + baseQuery + "_" + System.currentTimeMillis();
job.setJobName(jobName);
@@ -283,8 +283,7 @@ public class ComputeResponse
// Filter out by the provided stopListFile entries
if (qSchema.getFilter() != null)
{
- JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars));
- return filteredRDD;
+ return jsonRDD.filter(new FilterData(accum, bVars));
}
else
{
@@ -303,7 +302,7 @@ public class ComputeResponse
JavaRDD<MapWritable> jsonRDD;
- Job job = new Job();
+ Job job = Job.getInstance();
String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis();
job.setJobName(jobName);
job.getConfiguration().set("es.nodes", SystemConfiguration.getProperty("es.nodes"));
@@ -316,8 +315,7 @@ public class ComputeResponse
// Filter out by the provided stopListFile entries
if (qSchema.getFilter() != null)
{
- JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars));
- return filteredRDD;
+ return jsonRDD.filter(new FilterData(accum, bVars));
}
else
{
@@ -386,11 +384,11 @@ public class ComputeResponse
JavaPairRDD<Long,BigInteger> encColRDD;
if (colMultReduceByKey)
{
- encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions);
+ encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(bVars), numColMultPartitions);
}
else
{
- encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(accum, bVars));
+ encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(bVars));
}
// Form the final response object
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
index 56c917c..cdab612 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
@@ -39,7 +39,7 @@ public class EncColMultGroupedMapper implements PairFunction<Tuple2<Long,Iterabl
Query query = null;
- public EncColMultGroupedMapper(Accumulators accumIn, BroadcastVars bbVarsIn)
+ public EncColMultGroupedMapper(BroadcastVars bbVarsIn)
{
query = bbVarsIn.getQuery();
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
index 9242df7..9bde1f7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
@@ -37,7 +37,7 @@ public class EncColMultReducer implements Function2<BigInteger,BigInteger,BigInt
Query query = null;
- public EncColMultReducer(Accumulators accumIn, BroadcastVars bbVarsIn)
+ public EncColMultReducer(BroadcastVars bbVarsIn)
{
query = bbVarsIn.getQuery();
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
index cb95b5f..09b1e52 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
@@ -23,6 +23,7 @@ import java.io.OutputStreamWriter;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -53,7 +54,7 @@ public class ExpKeyFilenameMap implements PairFlatMapFunction<Iterator<Tuple2<In
@Override
public Iterable<Tuple2<Integer,String>> call(Iterator<Tuple2<Integer,Iterable<Tuple2<Integer,BigInteger>>>> iter) throws Exception
{
- ArrayList<Tuple2<Integer,String>> keyFileList = new ArrayList<>();
+ List<Tuple2<Integer,String>> keyFileList = new ArrayList<>();
FileSystem fs = FileSystem.get(new Configuration());
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
index 0a8959a..a84fd49 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
@@ -42,7 +42,7 @@ public class FilterData implements Function<MapWritable,Boolean>
private DataSchema dSchema = null;
private Object filter = null;
- public FilterData(Accumulators accumIn, BroadcastVars bbVarsIn) throws Exception
+ public FilterData(Accumulators accumIn, BroadcastVars bbVarsIn)
{
accum = accumIn;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
index eaf7384..acb8682 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
@@ -18,6 +18,7 @@
*/
package org.apache.pirk.responder.wideskies.spark.streaming;
+import java.io.IOException;
import java.math.BigInteger;
import java.util.LinkedList;
import java.util.List;
@@ -46,7 +47,6 @@ import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.schema.query.QuerySchemaLoader;
import org.apache.pirk.schema.query.QuerySchemaRegistry;
import org.apache.pirk.serialization.HadoopFileSystemStore;
-import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@@ -85,7 +85,7 @@ public class ComputeStreamingResponse
private String outputDirExp = null;
private String queryInput = null;
- QuerySchema qSchema = null;
+ private QuerySchema qSchema = null;
private String esQuery = "none";
private String esResource = "none";
@@ -94,17 +94,13 @@ public class ComputeStreamingResponse
private HadoopFileSystemStore storage = null;
private JavaStreamingContext jssc = null;
- boolean useQueueStream = false;
+ private boolean useQueueStream = false;
- private long batchSeconds = 0;
private long windowLength = 0;
private Accumulators accum = null;
private BroadcastVars bVars = null;
- private QueryInfo queryInfo = null;
- Query query = null;
-
private int numDataPartitions = 0;
private int numColMultPartitions = 0;
@@ -154,7 +150,7 @@ public class ComputeStreamingResponse
+ " esResource = " + esResource);
// Pull the batchSeconds and windowLength parameters
- batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30);
+ long batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30);
windowLength = SystemConfiguration.getLongProperty("pir.sparkstreaming.windowLength", 60);
if (windowLength % batchSeconds != 0)
{
@@ -191,8 +187,8 @@ public class ComputeStreamingResponse
bVars = new BroadcastVars(jssc.sparkContext());
// Set the Query and QueryInfo broadcast variables
- query = storage.recall(queryInput, Query.class);
- queryInfo = query.getQueryInfo();
+ Query query = storage.recall(queryInput, Query.class);
+ QueryInfo queryInfo = query.getQueryInfo();
bVars.setQuery(query);
bVars.setQueryInfo(queryInfo);
@@ -258,7 +254,7 @@ public class ComputeStreamingResponse
/**
* Method to read in data from an allowed input source/format and perform the query
*/
- public void performQuery() throws Exception
+ public void performQuery() throws IOException, ClassNotFoundException, InterruptedException
{
logger.info("Performing query: ");
@@ -279,11 +275,11 @@ public class ComputeStreamingResponse
* Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements
*/
@SuppressWarnings("unchecked")
- public JavaDStream<MapWritable> readData() throws ClassNotFoundException, Exception
+ public JavaDStream<MapWritable> readData() throws ClassNotFoundException, IOException
{
logger.info("Reading data ");
- Job job = new Job();
+ Job job = Job.getInstance();
String baseQuery = SystemConfiguration.getProperty("pir.baseQuery");
String jobName = "pirSpark_base_" + baseQuery + "_" + System.currentTimeMillis();
job.setJobName(jobName);
@@ -298,7 +294,7 @@ public class ComputeStreamingResponse
Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString);
if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass))
{
- throw new Exception("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
+ throw new ClassCastException("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
}
job.setInputFormatClass(inputClass);
@@ -306,10 +302,10 @@ public class ComputeStreamingResponse
// Read data from hdfs
logger.info("useQueueStream = " + useQueueStream);
- JavaDStream<MapWritable> mwStream = null;
+ JavaDStream<MapWritable> mwStream;
if (useQueueStream)
{
- Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>();
+ Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<>();
JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values()
.coalesce(numDataPartitions);
@@ -334,8 +330,7 @@ public class ComputeStreamingResponse
// Filter out by the provided stopListFile entries
if (qSchema.getFilter() != null)
{
- JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars));
- return filteredRDD;
+ return mwStream.filter(new FilterData(accum, bVars));
}
return mwStream;
@@ -345,11 +340,11 @@ public class ComputeStreamingResponse
* Method to read in the data from elasticsearch, filter, and return a RDD of MapWritable data elements
*/
@SuppressWarnings("unchecked")
- public JavaDStream<MapWritable> readDataES() throws Exception
+ public JavaDStream<MapWritable> readDataES() throws IOException
{
logger.info("Reading data ");
- Job job = new Job();
+ Job job = Job.getInstance();
String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis();
job.setJobName(jobName);
job.getConfiguration().set("es.nodes", SystemConfiguration.getProperty("es.nodes"));
@@ -358,10 +353,10 @@ public class ComputeStreamingResponse
job.getConfiguration().set("es.query", esQuery);
// Read data from hdfs
- JavaDStream<MapWritable> mwStream = null;
+ JavaDStream<MapWritable> mwStream;
if (useQueueStream)
{
- Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>();
+ Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<>();
JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values()
.coalesce(numDataPartitions);
rddQueue.add(rddIn);
@@ -386,8 +381,7 @@ public class ComputeStreamingResponse
// Filter out by the provided stopListFile entries
if (qSchema.getFilter() != null)
{
- JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars));
- return filteredRDD;
+ return mwStream.filter(new FilterData(accum, bVars));
}
else
{
@@ -401,7 +395,7 @@ public class ComputeStreamingResponse
* @throws InterruptedException
*
*/
- public void performQuery(JavaDStream<MapWritable> input) throws PIRException, InterruptedException
+ public void performQuery(JavaDStream<MapWritable> input) throws InterruptedException
{
logger.info("Performing query: ");
@@ -430,38 +424,33 @@ public class ComputeStreamingResponse
}
// Method to compute the final encrypted columns
- private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD) throws PIRException
+ private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD)
{
// Multiply the column values by colNum: emit <colNum, finalColVal>
JavaPairDStream<Long,BigInteger> encColRDD;
if (colMultReduceByKey)
{
- encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions);
+ encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(bVars), numColMultPartitions);
}
else
{
- encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(accum, bVars));
+ encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(bVars));
}
// Update the output name, by batch number
bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue());
// Form and write the response object
- encColRDD.repartition(1).foreachRDD(new VoidFunction<JavaPairRDD<Long,BigInteger>>()
- {
- @Override
- public void call(JavaPairRDD<Long,BigInteger> rdd)
- {
- rdd.foreachPartition(new FinalResponseFunction(accum, bVars));
-
- int maxBatchesVar = bVars.getMaxBatches();
- if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar)
- {
- logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down");
- System.exit(0);
- }
+ encColRDD.repartition(1).foreachRDD((VoidFunction<JavaPairRDD<Long, BigInteger>>) rdd -> {
+ rdd.foreachPartition(new FinalResponseFunction(accum, bVars));
+ int maxBatchesVar = bVars.getMaxBatches();
+ if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar)
+ {
+ logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down");
+ System.exit(0);
}
+
});
}
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
index 8831e4e..90375aa 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
@@ -19,6 +19,10 @@
package org.apache.pirk.responder.wideskies.storm;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -28,10 +32,6 @@ import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.LoggerFactory;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* Bolt class to perform encrypted column multiplication
* <p>
@@ -60,9 +60,7 @@ public class EncColMultBolt extends BaseRichBolt
private Long totalFlushSignals;
// This is the main object here. It holds column Id -> aggregated product
- private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
- private BigInteger colVal1;
- private BigInteger colMult;
+ private Map<Long,BigInteger> resultsMap = new HashMap<>();
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
@@ -93,7 +91,7 @@ public class EncColMultBolt extends BaseRichBolt
resultsMap.clear();
// Send signal to OutputBolt to write output and notify EncRowCalcBolt that results have been flushed.
- outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(new Long(-1), BigInteger.valueOf(0)));
+ outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(-1L, BigInteger.ZERO));
outputCollector.emit(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Values(1));
numFlushSignals = 0;
}
@@ -103,13 +101,13 @@ public class EncColMultBolt extends BaseRichBolt
// Data tuple received. Do column multiplication.
long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ERC_FIELD);
- colVal1 = (BigInteger) tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD);
+ BigInteger colVal1 = (BigInteger) tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD);
logger.debug("Received tuple in ECM, multiplying {} to col {}", colVal1, colIndex);
if (resultsMap.containsKey(colIndex))
{
- colMult = colVal1.multiply(resultsMap.get(colIndex));
+ BigInteger colMult = colVal1.multiply(resultsMap.get(colIndex));
resultsMap.put(colIndex, colMult.mod(nSquared));
}
else
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
index 324fbf1..8a5b854 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
@@ -19,8 +19,17 @@
package org.apache.pirk.responder.wideskies.storm;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
+
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -29,15 +38,8 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
+import scala.Tuple2;
/**
* Bolt class to perform the encrypted row calculation
@@ -69,8 +71,8 @@ public class EncRowCalcBolt extends BaseRichBolt
private Random rand;
// These are the main data structures used here.
- private Map<Integer,Integer> hitsByRow = new HashMap<Integer,Integer>();
- private Map<Integer,Integer> colIndexByRow = new HashMap<Integer,Integer>();
+ private Map<Integer,Integer> hitsByRow = new HashMap<>();
+ private Map<Integer,Integer> colIndexByRow = new HashMap<>();
private List<Tuple2<Long,BigInteger>> matrixElements = new ArrayList<>();
private List<BigInteger> dataArray = new ArrayList<>();
@@ -164,8 +166,8 @@ public class EncRowCalcBolt extends BaseRichBolt
* Extracts (hash, data partitions) from tuple. Encrypts the data partitions. Returns all of the pairs of (col index, col value). Also advances the
* colIndexByRow and hitsByRow appropriately.
*
- * @param tuple
- * @return
+ * @param tuple {@code Tuple}
+ * @return {@code List<Tuple2>}
*/
private List<Tuple2<Long,BigInteger>> processTupleFromPartitionDataBolt(Tuple tuple)
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
index 68b02f3..17cbee3 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
@@ -19,6 +19,18 @@
package org.apache.pirk.responder.wideskies.storm;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,17 +45,6 @@ import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.net.URI;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
/**
* Bolt to compute and output the final Response object for a query
* <p>
@@ -65,11 +66,9 @@ public class OutputBolt extends BaseRichBolt
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(OutputBolt.class);
private OutputCollector outputCollector;
- private QueryInfo queryInfo;
private Response response;
private String outputFile;
private boolean hdfs;
- private String hdfsUri;
private Integer flushCounter = 0;
private List<Tuple> tuplesToAck = new ArrayList<>();
private Integer totalFlushSigs;
@@ -81,10 +80,7 @@ public class OutputBolt extends BaseRichBolt
public static CountDownLatch latch = new CountDownLatch(4);
// This is the main object here. It holds column Id -> product
- private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
-
- private BigInteger colVal;
- private BigInteger colMult;
+ private Map<Long,BigInteger> resultsMap = new HashMap<>();
private BigInteger nSquared;
@@ -100,7 +96,7 @@ public class OutputBolt extends BaseRichBolt
if (hdfs)
{
- hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
+ String hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
try
{
FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
@@ -116,7 +112,7 @@ public class OutputBolt extends BaseRichBolt
localStore = new LocalFileSystemStore();
}
nSquared = new BigInteger((String) map.get(StormConstants.N_SQUARED_KEY));
- queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+ QueryInfo queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
response = new Response(queryInfo);
logger.info("Intitialized OutputBolt.");
@@ -126,7 +122,7 @@ public class OutputBolt extends BaseRichBolt
public void execute(Tuple tuple)
{
long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ECM_FIELD);
- colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD);
+ BigInteger colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD);
// colIndex == -1 is just the signal sent by EncColMultBolt to notify that it flushed it's values.
// Could have created a new stream for such signals, but that seemed like overkill.
@@ -137,12 +133,12 @@ public class OutputBolt extends BaseRichBolt
logger.debug("Received " + flushCounter + " output flush signals out of " + totalFlushSigs);
// Wait till all EncColMultBolts have been flushed
- if (flushCounter == totalFlushSigs)
+ if (Objects.equals(flushCounter, totalFlushSigs))
{
logger.info("TimeToFlush reached - outputting response to " + outputFile + " with columns.size = " + resultsMap.keySet().size());
try
{
- String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())).toString();
+ String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date()));
for (long cv : resultsMap.keySet())
{
response.addElement((int) cv, resultsMap.get(cv));
@@ -182,7 +178,7 @@ public class OutputBolt extends BaseRichBolt
// in which case a small number of multiplications still need to be done per column.
if (resultsMap.containsKey(colIndex))
{
- colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared);
+ BigInteger colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared);
resultsMap.put(colIndex, colMult);
}
else
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
index 9d24620..bfa916f 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
@@ -20,15 +20,11 @@
package org.apache.pirk.responder.wideskies.storm;
import java.math.BigInteger;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.query.wideskies.QueryUtils;
-
-import org.apache.pirk.schema.data.DataSchema;
-import org.apache.pirk.schema.data.DataSchemaRegistry;
import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.schema.query.QuerySchemaRegistry;
import org.apache.storm.task.TopologyContext;
@@ -54,8 +50,6 @@ public class PartitionDataBolt extends BaseBasicBolt
private static final long serialVersionUID = 1L;
- private QueryInfo queryInfo;
- private String queryType;
private QuerySchema qSchema = null;
private boolean embedSelector;
@@ -63,13 +57,12 @@ public class PartitionDataBolt extends BaseBasicBolt
private boolean splitPartitions;
private JSONObject json;
- private List<BigInteger> partitions;
@Override
public void prepare(Map map, TopologyContext context)
{
- queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
- queryType = queryInfo.getQueryType();
+ QueryInfo queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+ String queryType = queryInfo.getQueryType();
embedSelector = queryInfo.getEmbedSelector();
logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS));
StormUtils.initializeSchemas(map, "partition");
@@ -102,7 +95,7 @@ public class PartitionDataBolt extends BaseBasicBolt
try
{
- partitions = QueryUtils.partitionDataElement(qSchema, json, embedSelector);
+ List<BigInteger> partitions = QueryUtils.partitionDataElement(qSchema, json, embedSelector);
logger.debug("HashSelectorsAndPartitionDataBolt processing {} outputting results - {}", json.toString(), partitions.size());
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
index 76bb80c..50d00c8 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
@@ -18,28 +18,25 @@
*/
package org.apache.pirk.responder.wideskies.storm;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.query.wideskies.QueryUtils;
import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.schema.query.QuerySchemaRegistry;
import org.apache.pirk.utils.KeyedHash;
-
import org.apache.storm.Config;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
/**
* Scheme used by spout to retrieve and hash selector from JSON data on Kafka.
*/
@@ -51,7 +48,6 @@ public class PirkHashScheme extends StringScheme implements Scheme
private QueryInfo queryInfo;
transient private JSONParser parser;
- transient private JSONObject json;
private boolean initialized = false;
private QuerySchema qSchema;
private Config conf;
@@ -81,8 +77,9 @@ public class PirkHashScheme extends StringScheme implements Scheme
initialized = true;
}
- String str = super.deserializeString(bytes);
+ String str = deserializeString(bytes);
+ JSONObject json;
try
{
json = (JSONObject) parser.parse(str);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
index 6540ecc..e0f83d3 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
@@ -147,11 +147,11 @@ public class PirkTopology
conf.setDebug(false);
// conf.setNumEventLoggers(2);
- conf.put(conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.receiveBufferSize", "1024")));
- conf.put(conf.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.sendBufferSize", "1024")));
- conf.put(conf.TOPOLOGY_TRANSFER_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.transferBufferSize", "32")));
- conf.put(conf.WORKER_HEAP_MEMORY_MB, Integer.parseInt(SystemConfiguration.getProperty("storm.worker.heapMemory", "750")));
- conf.put(conf.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128")));
+ conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.executor.receiveBufferSize", 1024));
+ conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.executor.sendBufferSize", 1024));
+ conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.transferBufferSize", 32));
+ conf.put(Config.WORKER_HEAP_MEMORY_MB, SystemConfiguration.getIntProperty("storm.worker.heapMemory", 750));
+ conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128")));
// Pirk parameters to send to bolts
conf.put(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY, SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true"));
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
index 7f1e59d..6ef6bc9 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
@@ -21,49 +21,49 @@ package org.apache.pirk.responder.wideskies.storm;
public class StormConstants
{
// Topology Components
- public static final String SPOUT_ID = "kafkaspout";
- public static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt";
- public static final String ENCROWCALCBOLT_ID = "encrowcalcbolt";
- public static final String ENCCOLMULTBOLT_ID = "enccolmultbolt";
- public static final String OUTPUTBOLT_ID = "outputbolt";
+ static final String SPOUT_ID = "kafkaspout";
+ static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt";
+ static final String ENCROWCALCBOLT_ID = "encrowcalcbolt";
+ static final String ENCCOLMULTBOLT_ID = "enccolmultbolt";
+ static final String OUTPUTBOLT_ID = "outputbolt";
// Extra Streams
- public static final String DEFAULT = "default";
- public static final String ENCROWCALCBOLT_DATASTREAM_ID = "encrowcalcbolt_datastream_id";
- public static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush";
- public static final String ENCCOLMULTBOLT_SESSION_END = "enccolmultbolt_sess_end";
+ static final String DEFAULT = "default";
+ static final String ENCROWCALCBOLT_DATASTREAM_ID = "encrowcalcbolt_datastream_id";
+ static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush";
+ static final String ENCCOLMULTBOLT_SESSION_END = "enccolmultbolt_sess_end";
// Tuple Fields
// From HashBolt (and variants)
- public static final String HASH_FIELD = "hash";
- public static final String PARTIONED_DATA_FIELD = "parData";
- public static final String JSON_DATA_FIELD = "data";
+ static final String HASH_FIELD = "hash";
+ static final String PARTIONED_DATA_FIELD = "parData";
+ static final String JSON_DATA_FIELD = "data";
// From EncRowCalcBolt
- public static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc";
- public static final String ENCRYPTED_VALUE_FIELD = "encRowValue";
+ static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc";
+ static final String ENCRYPTED_VALUE_FIELD = "encRowValue";
// From EncColMultBolt
- public static final String COLUMN_INDEX_ECM_FIELD = "colIndex";
- public static final String COLUMN_PRODUCT_FIELD = "colProduct";
+ static final String COLUMN_INDEX_ECM_FIELD = "colIndex";
+ static final String COLUMN_PRODUCT_FIELD = "colProduct";
// Configuration Keys
public static final String USE_HDFS = "useHdfs";
- public static final String HDFS_URI_KEY = "hdfsUri";
- public static final String QUERY_FILE_KEY = "queryFile";
+ static final String HDFS_URI_KEY = "hdfsUri";
+ static final String QUERY_FILE_KEY = "queryFile";
public static final String QUERY_INFO_KEY = "queryInfo";
- public static final String ALLOW_ADHOC_QSCHEMAS_KEY = "allowAdHocQuerySchemas";
- public static final String QSCHEMA_KEY = "qSchema";
- public static final String DSCHEMA_KEY = "dschema";
+ static final String ALLOW_ADHOC_QSCHEMAS_KEY = "allowAdHocQuerySchemas";
+ static final String QSCHEMA_KEY = "qSchema";
+ static final String DSCHEMA_KEY = "dschema";
public static final String OUTPUT_FILE_KEY = "output";
- public static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector";
- public static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector";
- public static final String SALT_COLUMNS_KEY = "saltColumns";
- public static final String ROW_DIVISIONS_KEY = "rowDivisions";
- public static final String SPLIT_PARTITIONS_KEY = "splitPartitions";
+ static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector";
+ static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector";
+ static final String SALT_COLUMNS_KEY = "saltColumns";
+ static final String ROW_DIVISIONS_KEY = "rowDivisions";
+ static final String SPLIT_PARTITIONS_KEY = "splitPartitions";
public static final String N_SQUARED_KEY = "nSquared";
- public static final String ENCROWCALCBOLT_PARALLELISM_KEY = "encrowcalcboltPar";
- public static final String ENCCOLMULTBOLT_PARALLELISM_KEY = "enccolmultboltPar";
+ static final String ENCROWCALCBOLT_PARALLELISM_KEY = "encrowcalcboltPar";
+ static final String ENCCOLMULTBOLT_PARALLELISM_KEY = "enccolmultboltPar";
- public static final String SALT = "salt";
- public static final String FLUSH = "flush";
+ static final String SALT = "salt";
+ static final String FLUSH = "flush";
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
index 7fbca66..bbffaba 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
@@ -18,6 +18,9 @@
*/
package org.apache.pirk.responder.wideskies.storm;
+import java.net.URI;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.pirk.query.wideskies.Query;
@@ -30,9 +33,6 @@ import org.apache.storm.Constants;
import org.apache.storm.tuple.Tuple;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.util.Map;
-
/**
* Utils class for the Storm implementation of Wideskies
*/
@@ -43,14 +43,14 @@ public class StormUtils
/**
* Method to read in serialized Query object from the given queryFile
*
- * @param useHdfs
- * @param hdfsUri
- * @param queryFile
- * @return
+ * @param useHdfs - true or false
+ * @param hdfsUri - HDFS path
+ * @param queryFile -
+ * @return {@link Query}
*/
public static Query getQuery(boolean useHdfs, String hdfsUri, String queryFile)
{
- Query query = null;
+ Query query;
try
{
@@ -77,7 +77,7 @@ public class StormUtils
* Method to read in and return a serialized Query object from the given file and initialize/load the query.schemas and data.schemas
*
* @param map
- * @return
+ * @return {@link Query}
*/
public static Query prepareQuery(Map map)
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index c651eaa..4bac8b7 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -31,9 +31,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.pirk.schema.data.partitioner.DataPartitioner;
import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
import org.apache.pirk.utils.PIRException;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/test/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/TestUtils.java b/src/main/java/org/apache/pirk/test/utils/TestUtils.java
index be01fb4..05c9f28 100644
--- a/src/main/java/org/apache/pirk/test/utils/TestUtils.java
+++ b/src/main/java/org/apache/pirk/test/utils/TestUtils.java
@@ -21,7 +21,6 @@ package org.apache.pirk.test.utils;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
@@ -269,10 +268,9 @@ public class TestUtils
/**
* Converts the result file into an ArrayList of QueryResponseJSON objects
*
- * @throws IOException
- * @throws FileNotFoundException
+ * @throws IOException - {@link IOException}
*/
- public static List<QueryResponseJSON> readResultsFile(File file) throws FileNotFoundException, IOException
+ public static List<QueryResponseJSON> readResultsFile(File file) throws IOException
{
List<QueryResponseJSON> results = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(file)))
@@ -290,7 +288,7 @@ public class TestUtils
/**
* Write the ArrayList<String to a tmp file in the local filesystem with the given fileName
- *
+ * @throws IOException - {@link IOException}
*/
public static String writeToTmpFile(List<String> list, String fileName, String suffix) throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
index fc4e703..425bb0b 100644
--- a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
+++ b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
@@ -18,6 +18,15 @@
*/
package org.apache.pirk.storm;
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
@@ -25,8 +34,8 @@ import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
-
import org.I0Itec.zkclient.ZkConnection;
+
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -38,7 +47,10 @@ import org.apache.pirk.querier.wideskies.QuerierConst;
import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
import org.apache.pirk.query.wideskies.QueryInfo;
-import org.apache.pirk.responder.wideskies.storm.*;
+import org.apache.pirk.responder.wideskies.storm.OutputBolt;
+import org.apache.pirk.responder.wideskies.storm.PirkHashScheme;
+import org.apache.pirk.responder.wideskies.storm.PirkTopology;
+import org.apache.pirk.responder.wideskies.storm.StormConstants;
import org.apache.pirk.response.wideskies.Response;
import org.apache.pirk.schema.query.filter.StopListFilter;
import org.apache.pirk.schema.response.QueryResponseJSON;
@@ -61,18 +73,12 @@ import org.apache.storm.testing.TestJob;
import org.json.simple.JSONObject;
import org.junit.AfterClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Properties;
-import java.util.HashMap;
-import java.util.Arrays;
-import java.util.ArrayList;
-
@Category(IntegrationTest.class)
public class KafkaStormIntegrationTest
{
@@ -87,6 +93,9 @@ public class KafkaStormIntegrationTest
private static final String topic = "pirk_test_topic";
private static final String kafkaTmpDir = "/tmp/kafka";
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
private static File fileQuery;
private static File fileQuerier;
@@ -122,7 +131,7 @@ public class KafkaStormIntegrationTest
performEncryption();
SystemConfiguration.setProperty("pir.queryInput", fileQuery.getAbsolutePath());
- KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig());
+ KafkaProducer<String,String> producer = new KafkaProducer<>(createKafkaProducerConfig());
loadTestData(producer);
logger.info("Test (splitPartitions,saltColumns) = (true,true)");
@@ -260,17 +269,13 @@ public class KafkaStormIntegrationTest
zookeeperLocalCluster.stop();
FileUtils.deleteDirectory(new File(kafkaTmpDir));
-
- fileQuery.delete();
- fileQuerier.delete();
-
}
- private HashMap<String,Object> createKafkaProducerConfig()
+ private Map<String,Object> createKafkaProducerConfig()
{
String kafkaHostName = "localhost";
- Integer kafkaPorts = 11111;
- HashMap<String,Object> config = new HashMap<String,Object>();
+ int kafkaPorts = 11111;
+ Map<String,Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostName + ":" + kafkaPorts);
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
@@ -278,18 +283,17 @@ public class KafkaStormIntegrationTest
return config;
}
- private void loadTestData(KafkaProducer producer)
+ private void loadTestData(KafkaProducer<String,String> producer)
{
for (JSONObject dataRecord : Inputs.createJSONDataElements())
{
logger.info("Sending record to Kafka " + dataRecord.toString());
- producer.send(new ProducerRecord<String,String>(topic, dataRecord.toString()));
+ producer.send(new ProducerRecord<>(topic, dataRecord.toString()));
}
}
private void performEncryption() throws Exception
{
- // ArrayList<String> selectors = BaseTests.selectorsDomain;
List<String> selectors = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net"));
String queryType = Inputs.DNS_HOSTNAME_QUERY;
@@ -307,8 +311,8 @@ public class KafkaStormIntegrationTest
logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:");
// Write out files.
- fileQuerier = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG, ".txt");
- fileQuery = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG, ".txt");
+ fileQuerier = folder.newFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG + ".txt");
+ fileQuery = folder.newFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG + ".txt");
localStore.store(fileQuerier.getAbsolutePath(), querier);
localStore.store(fileQuery, querier.getQuery());