You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pirk.apache.org by ea...@apache.org on 2016/09/02 19:06:04 UTC

incubator-pirk git commit: Fix deprecated Api calls and other minor fixes - closes apache/incubator-pirk#84

Repository: incubator-pirk
Updated Branches:
  refs/heads/master 45b2da870 -> 5cc5e7c24


Fix deprecated Api calls and other minor fixes - closes apache/incubator-pirk#84


Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/5cc5e7c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/5cc5e7c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/5cc5e7c2

Branch: refs/heads/master
Commit: 5cc5e7c24728695d4c2cdc8bd88555c52e60887c
Parents: 45b2da8
Author: smarthi <sm...@apache.org>
Authored: Fri Sep 2 15:06:00 2016 -0400
Committer: eawilliams <ea...@apache.org>
Committed: Fri Sep 2 15:06:00 2016 -0400

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 .../responder/wideskies/ResponderDriver.java    |  4 +-
 .../responder/wideskies/ResponderProps.java     | 70 ++++++++++---------
 .../wideskies/common/ComputeEncryptedRow.java   | 10 +--
 .../mapreduce/ComputeResponseTool.java          | 12 ++--
 .../wideskies/spark/ComputeResponse.java        | 14 ++--
 .../spark/EncColMultGroupedMapper.java          |  2 +-
 .../wideskies/spark/EncColMultReducer.java      |  2 +-
 .../wideskies/spark/ExpKeyFilenameMap.java      |  3 +-
 .../responder/wideskies/spark/FilterData.java   |  2 +-
 .../streaming/ComputeStreamingResponse.java     | 71 +++++++++-----------
 .../wideskies/storm/EncColMultBolt.java         | 18 +++--
 .../wideskies/storm/EncRowCalcBolt.java         | 26 +++----
 .../responder/wideskies/storm/OutputBolt.java   | 42 ++++++------
 .../wideskies/storm/PartitionDataBolt.java      | 13 +---
 .../wideskies/storm/PirkHashScheme.java         | 15 ++---
 .../responder/wideskies/storm/PirkTopology.java | 10 +--
 .../wideskies/storm/StormConstants.java         | 60 ++++++++---------
 .../responder/wideskies/storm/StormUtils.java   | 18 ++---
 .../pirk/schema/data/DataSchemaLoader.java      |  2 -
 .../org/apache/pirk/test/utils/TestUtils.java   |  8 +--
 .../pirk/storm/KafkaStormIntegrationTest.java   | 50 +++++++-------
 22 files changed, 212 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 51e16b9..3119282 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,7 +88,7 @@
         <hadoop.version>2.7.2</hadoop.version>
         <spark.version>1.6.1</spark.version>
         <apache-commons.version>3.3</apache-commons.version>
-        <elasticsearch.version>2.3.3</elasticsearch.version>
+        <elasticsearch.version>2.3.4</elasticsearch.version>
         <storm.version>1.0.1</storm.version>
         <kafka.version>0.9.0.1</kafka.version>
         <spark-streaming.version>2.0.0</spark-streaming.version>

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index f8e396b..6f34de5 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -50,9 +50,9 @@ public class ResponderDriver
 {
   private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class);
 
-  enum Platform
+  private enum Platform
   {
-    MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE;
+    MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE
   }
 
   public static void main(String[] args) throws Exception

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index 55846fd..f73fdbe 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -21,9 +21,7 @@ package org.apache.pirk.responder.wideskies;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.cli.Option;
 import org.apache.pirk.inputformat.hadoop.InputFormatConst;
 import org.apache.pirk.schema.data.DataSchemaLoader;
 import org.apache.pirk.schema.query.QuerySchemaLoader;
@@ -62,14 +60,14 @@ public class ResponderProps
   public static final String NUMCOLMULTPARTITIONS = "pir.numColMultPartitions";
   public static final String USEMODEXPJOIN = "pir.useModExpJoin";
   public static final String COLMULTREDUCEBYKEY = "pir.colMultReduceByKey";
-  public static final String NUMREDUCETASKS = "pir.numReduceTasks";
-  public static final String MAPMEMORY = "mapreduce.map.memory.mb";
-  public static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
-  public static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
-  public static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
-  public static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
+  static final String NUMREDUCETASKS = "pir.numReduceTasks";
+  static final String MAPMEMORY = "mapreduce.map.memory.mb";
+  static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
+  static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
+  static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
+  static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
   public static final String NUMDATAPARTITIONS = "pir.numDataPartitions";
-  public static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
+  static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
 
   // For Spark Streaming - optional
   public static final String BATCHSECONDS = "pir.sparkstreaming.batchSeconds";
@@ -80,35 +78,35 @@ public class ResponderProps
 
   // Storm parameters
   // hdfs
-  static final String HDFSURI = "hdfs.uri";
-  static final String USEHDFS = "hdfs.use";
+  private static final String HDFSURI = "hdfs.uri";
+  private static final String USEHDFS = "hdfs.use";
   // kafka
-  static final String KAFKATOPIC = "kafka.topic";
-  static final String KAFKACLIENTID = "kafka.clientId";
-  static final String KAFKAZK = "kafka.zk";
-  static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart";
+  private static final String KAFKATOPIC = "kafka.topic";
+  private static final String KAFKACLIENTID = "kafka.clientId";
+  private static final String KAFKAZK = "kafka.zk";
+  private static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart";
   // pirk topo
-  static final String STORMTOPONAME = "storm.topoName";
-  static final String STORMWORKERS = "storm.workers";
-  static final String STORMNUMACKERS = "storm.numAckers";
-  static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize";
-  static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize";
-  static final String STORMTRANSFERBUFFERS = "storm.executor.transferBufferSize";
-  static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending";
-  static final String STORMHEAPMEMORY = "storm.worker.heapMemory";
-  static final String STORMCHILDOPTS = "storm.worker.childOpts";
-  static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory";
-  static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem";
-  static final String STORMSPOUTPAR = "storm.spout.parallelism";
-  static final String STORMPARTITIONDATABOLTPAR = "storm.partitiondata.parallelism";
-  static final String STORMENCROWCALCBOLTPAR = "storm.encrowcalcbolt.parallelism";
-  static final String STORMENCCOLMULTBOLTPAR = "storm.enccolmultbolt.parallelism";
-  static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple";
-  static final String STORMSPLITPARTITIONS = "storm.splitPartitions";
-  static final String STORMSALTCOLUMNS = "storm.saltColumns";
-  static final String STORMNUMROWDIVS = "storm.rowDivs";
-
-  static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS,
+  private static final String STORMTOPONAME = "storm.topoName";
+  private static final String STORMWORKERS = "storm.workers";
+  private static final String STORMNUMACKERS = "storm.numAckers";
+  private static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize";
+  private static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize";
+  private static final String STORMTRANSFERBUFFERS = "storm.executor.transferBufferSize";
+  private static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending";
+  private static final String STORMHEAPMEMORY = "storm.worker.heapMemory";
+  private static final String STORMCHILDOPTS = "storm.worker.childOpts";
+  private static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory";
+  private static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem";
+  private static final String STORMSPOUTPAR = "storm.spout.parallelism";
+  private static final String STORMPARTITIONDATABOLTPAR = "storm.partitiondata.parallelism";
+  private static final String STORMENCROWCALCBOLTPAR = "storm.encrowcalcbolt.parallelism";
+  private static final String STORMENCCOLMULTBOLTPAR = "storm.enccolmultbolt.parallelism";
+  private static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple";
+  private static final String STORMSPLITPARTITIONS = "storm.splitPartitions";
+  private static final String STORMSALTCOLUMNS = "storm.saltColumns";
+  private static final String STORMNUMROWDIVS = "storm.rowDivs";
+
+  private static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS,
       STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, STORMMAXWORKERHEAP,
       STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY,
       STORMSPLITPARTITIONS, STORMSALTCOLUMNS, STORMNUMROWDIVS};

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
index 0065094..f63cd08 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
@@ -345,10 +345,10 @@ public class ComputeEncryptedRow
     {
       BigInteger part = dataPartitions.get(i);
 
-      BigInteger exp = null;
+      BigInteger exp;
       try
       {
-        exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+        exp = expCache.get(new Tuple3<>(rowQuery, part, query.getNSquared()));
       } catch (ExecutionException e)
       {
         e.printStackTrace();
@@ -358,7 +358,7 @@ public class ComputeEncryptedRow
       logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}", rowIndex, colCounter, part.toString(),
           part.toString(2), exp, i, dataPartitions.get(i), dataPartitions.get(i).toString(2));
 
-      returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+      returnPairs.add(new Tuple2<>(colCounter, exp));
 
       ++colCounter;
     }
@@ -380,13 +380,13 @@ public class ComputeEncryptedRow
     BigInteger exp = null;
     try
     {
-      exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+      exp = expCache.get(new Tuple3<>(rowQuery, part, query.getNSquared()));
     } catch (ExecutionException e)
     {
       e.printStackTrace();
     }
 
-    returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+    returnPairs.add(new Tuple2<>(colCounter, exp));
 
     ++colCounter;
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
index 1d06f86..ab41a47 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
@@ -110,8 +110,6 @@ public class ComputeResponseTool extends Configured implements Tool
   private String esQuery = "none";
   private String esResource = "none";
 
-  String dataSchema = "none";
-
   private Configuration conf = null;
   private FileSystem fs = null;
 
@@ -264,7 +262,7 @@ public class ComputeResponseTool extends Configured implements Tool
 
     // Run the job to generate the expTable
     // Job jobExp = new Job(mrConfig.getConfig(), "pirExp-" + pirWL.getWatchlistNum());
-    Job jobExp = new Job(conf, "pirExp-" + queryInfo.getIdentifier());
+    Job jobExp = Job.getInstance(conf, "pirExp-" + queryInfo.getIdentifier());
 
     jobExp.setSpeculativeExecution(false);
     jobExp.getConfiguration().set("mapreduce.map.speculative", "false");
@@ -312,7 +310,7 @@ public class ComputeResponseTool extends Configured implements Tool
 
     // Assemble the exp table from the output
     // element_index -> fileName
-    HashMap<Integer,String> expFileTable = new HashMap<>();
+    Map<Integer,String> expFileTable = new HashMap<>();
     FileStatus[] status = fs.listStatus(outPathExp);
     for (FileStatus fstat : status)
     {
@@ -352,7 +350,7 @@ public class ComputeResponseTool extends Configured implements Tool
   {
     boolean success;
 
-    Job job = new Job(conf, "pirMR");
+    Job job = Job.getInstance(conf, "pirMR");
     job.setSpeculativeExecution(false);
 
     // Set the data and query schema properties
@@ -445,7 +443,7 @@ public class ComputeResponseTool extends Configured implements Tool
   {
     boolean success;
 
-    Job columnMultJob = new Job(conf, "pir_columnMult");
+    Job columnMultJob = Job.getInstance(conf, "pir_columnMult");
     columnMultJob.setSpeculativeExecution(false);
 
     String columnMultJobName = "pir_columnMult";
@@ -505,7 +503,7 @@ public class ComputeResponseTool extends Configured implements Tool
   {
     boolean success;
 
-    Job finalResponseJob = new Job(conf, "pir_finalResponse");
+    Job finalResponseJob = Job.getInstance(conf, "pir_finalResponse");
     finalResponseJob.setSpeculativeExecution(false);
 
     String finalResponseJobName = "pir_finalResponse";

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index f2a54d7..d6593f7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -249,7 +249,7 @@ public class ComputeResponse
 
     JavaRDD<MapWritable> jsonRDD;
 
-    Job job = new Job();
+    Job job = Job.getInstance();
     String baseQuery = SystemConfiguration.getProperty("pir.baseQuery");
     String jobName = "pirSpark_base_" + baseQuery + "_" + System.currentTimeMillis();
     job.setJobName(jobName);
@@ -283,8 +283,7 @@ public class ComputeResponse
     // Filter out by the provided stopListFile entries
     if (qSchema.getFilter() != null)
     {
-      JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars));
-      return filteredRDD;
+      return jsonRDD.filter(new FilterData(accum, bVars));
     }
     else
     {
@@ -303,7 +302,7 @@ public class ComputeResponse
 
     JavaRDD<MapWritable> jsonRDD;
 
-    Job job = new Job();
+    Job job = Job.getInstance();
     String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis();
     job.setJobName(jobName);
     job.getConfiguration().set("es.nodes", SystemConfiguration.getProperty("es.nodes"));
@@ -316,8 +315,7 @@ public class ComputeResponse
     // Filter out by the provided stopListFile entries
     if (qSchema.getFilter() != null)
     {
-      JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars));
-      return filteredRDD;
+      return jsonRDD.filter(new FilterData(accum, bVars));
     }
     else
     {
@@ -386,11 +384,11 @@ public class ComputeResponse
     JavaPairRDD<Long,BigInteger> encColRDD;
     if (colMultReduceByKey)
     {
-      encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions);
+      encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(bVars), numColMultPartitions);
     }
     else
     {
-      encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(accum, bVars));
+      encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(bVars));
     }
 
     // Form the final response object

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
index 56c917c..cdab612 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
@@ -39,7 +39,7 @@ public class EncColMultGroupedMapper implements PairFunction<Tuple2<Long,Iterabl
 
   Query query = null;
 
-  public EncColMultGroupedMapper(Accumulators accumIn, BroadcastVars bbVarsIn)
+  public EncColMultGroupedMapper(BroadcastVars bbVarsIn)
   {
 
     query = bbVarsIn.getQuery();

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
index 9242df7..9bde1f7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
@@ -37,7 +37,7 @@ public class EncColMultReducer implements Function2<BigInteger,BigInteger,BigInt
 
   Query query = null;
 
-  public EncColMultReducer(Accumulators accumIn, BroadcastVars bbVarsIn)
+  public EncColMultReducer(BroadcastVars bbVarsIn)
   {
 
     query = bbVarsIn.getQuery();

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
index cb95b5f..09b1e52 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
@@ -23,6 +23,7 @@ import java.io.OutputStreamWriter;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -53,7 +54,7 @@ public class ExpKeyFilenameMap implements PairFlatMapFunction<Iterator<Tuple2<In
   @Override
   public Iterable<Tuple2<Integer,String>> call(Iterator<Tuple2<Integer,Iterable<Tuple2<Integer,BigInteger>>>> iter) throws Exception
   {
-    ArrayList<Tuple2<Integer,String>> keyFileList = new ArrayList<>();
+    List<Tuple2<Integer,String>> keyFileList = new ArrayList<>();
 
     FileSystem fs = FileSystem.get(new Configuration());
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
index 0a8959a..a84fd49 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
@@ -42,7 +42,7 @@ public class FilterData implements Function<MapWritable,Boolean>
   private DataSchema dSchema = null;
   private Object filter = null;
 
-  public FilterData(Accumulators accumIn, BroadcastVars bbVarsIn) throws Exception
+  public FilterData(Accumulators accumIn, BroadcastVars bbVarsIn)
   {
     accum = accumIn;
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
index eaf7384..acb8682 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pirk.responder.wideskies.spark.streaming;
 
+import java.io.IOException;
 import java.math.BigInteger;
 import java.util.LinkedList;
 import java.util.List;
@@ -46,7 +47,6 @@ import org.apache.pirk.schema.query.QuerySchema;
 import org.apache.pirk.schema.query.QuerySchemaLoader;
 import org.apache.pirk.schema.query.QuerySchemaRegistry;
 import org.apache.pirk.serialization.HadoopFileSystemStore;
-import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -85,7 +85,7 @@ public class ComputeStreamingResponse
   private String outputDirExp = null;
 
   private String queryInput = null;
-  QuerySchema qSchema = null;
+  private QuerySchema qSchema = null;
 
   private String esQuery = "none";
   private String esResource = "none";
@@ -94,17 +94,13 @@ public class ComputeStreamingResponse
   private HadoopFileSystemStore storage = null;
   private JavaStreamingContext jssc = null;
 
-  boolean useQueueStream = false;
+  private boolean useQueueStream = false;
 
-  private long batchSeconds = 0;
   private long windowLength = 0;
 
   private Accumulators accum = null;
   private BroadcastVars bVars = null;
 
-  private QueryInfo queryInfo = null;
-  Query query = null;
-
   private int numDataPartitions = 0;
   private int numColMultPartitions = 0;
 
@@ -154,7 +150,7 @@ public class ComputeStreamingResponse
         + " esResource = " + esResource);
 
     // Pull the batchSeconds and windowLength parameters
-    batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30);
+    long batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30);
     windowLength = SystemConfiguration.getLongProperty("pir.sparkstreaming.windowLength", 60);
     if (windowLength % batchSeconds != 0)
     {
@@ -191,8 +187,8 @@ public class ComputeStreamingResponse
     bVars = new BroadcastVars(jssc.sparkContext());
 
     // Set the Query and QueryInfo broadcast variables
-    query = storage.recall(queryInput, Query.class);
-    queryInfo = query.getQueryInfo();
+    Query query = storage.recall(queryInput, Query.class);
+    QueryInfo queryInfo = query.getQueryInfo();
     bVars.setQuery(query);
     bVars.setQueryInfo(queryInfo);
 
@@ -258,7 +254,7 @@ public class ComputeStreamingResponse
   /**
    * Method to read in data from an allowed input source/format and perform the query
    */
-  public void performQuery() throws Exception
+  public void performQuery() throws IOException, ClassNotFoundException, InterruptedException
   {
     logger.info("Performing query: ");
 
@@ -279,11 +275,11 @@ public class ComputeStreamingResponse
    * Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements
    */
   @SuppressWarnings("unchecked")
-  public JavaDStream<MapWritable> readData() throws ClassNotFoundException, Exception
+  public JavaDStream<MapWritable> readData() throws ClassNotFoundException, IOException
   {
     logger.info("Reading data ");
 
-    Job job = new Job();
+    Job job = Job.getInstance();
     String baseQuery = SystemConfiguration.getProperty("pir.baseQuery");
     String jobName = "pirSpark_base_" + baseQuery + "_" + System.currentTimeMillis();
     job.setJobName(jobName);
@@ -298,7 +294,7 @@ public class ComputeStreamingResponse
     Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString);
     if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass))
     {
-      throw new Exception("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
+      throw new ClassCastException("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
     }
     job.setInputFormatClass(inputClass);
 
@@ -306,10 +302,10 @@ public class ComputeStreamingResponse
 
     // Read data from hdfs
     logger.info("useQueueStream = " + useQueueStream);
-    JavaDStream<MapWritable> mwStream = null;
+    JavaDStream<MapWritable> mwStream;
     if (useQueueStream)
     {
-      Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>();
+      Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<>();
       JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values()
           .coalesce(numDataPartitions);
 
@@ -334,8 +330,7 @@ public class ComputeStreamingResponse
     // Filter out by the provided stopListFile entries
     if (qSchema.getFilter() != null)
     {
-      JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars));
-      return filteredRDD;
+      return mwStream.filter(new FilterData(accum, bVars));
     }
 
     return mwStream;
@@ -345,11 +340,11 @@ public class ComputeStreamingResponse
    * Method to read in the data from elasticsearch, filter, and return a RDD of MapWritable data elements
    */
   @SuppressWarnings("unchecked")
-  public JavaDStream<MapWritable> readDataES() throws Exception
+  public JavaDStream<MapWritable> readDataES() throws IOException
   {
     logger.info("Reading data ");
 
-    Job job = new Job();
+    Job job = Job.getInstance();
     String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis();
     job.setJobName(jobName);
     job.getConfiguration().set("es.nodes", SystemConfiguration.getProperty("es.nodes"));
@@ -358,10 +353,10 @@ public class ComputeStreamingResponse
     job.getConfiguration().set("es.query", esQuery);
 
     // Read data from hdfs
-    JavaDStream<MapWritable> mwStream = null;
+    JavaDStream<MapWritable> mwStream;
     if (useQueueStream)
     {
-      Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>();
+      Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<>();
       JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values()
           .coalesce(numDataPartitions);
       rddQueue.add(rddIn);
@@ -386,8 +381,7 @@ public class ComputeStreamingResponse
     // Filter out by the provided stopListFile entries
     if (qSchema.getFilter() != null)
     {
-      JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars));
-      return filteredRDD;
+      return mwStream.filter(new FilterData(accum, bVars));
     }
     else
     {
@@ -401,7 +395,7 @@ public class ComputeStreamingResponse
    * @throws InterruptedException
    * 
    */
-  public void performQuery(JavaDStream<MapWritable> input) throws PIRException, InterruptedException
+  public void performQuery(JavaDStream<MapWritable> input) throws InterruptedException
   {
     logger.info("Performing query: ");
 
@@ -430,38 +424,33 @@ public class ComputeStreamingResponse
   }
 
   // Method to compute the final encrypted columns
-  private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD) throws PIRException
+  private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD)
   {
     // Multiply the column values by colNum: emit <colNum, finalColVal>
     JavaPairDStream<Long,BigInteger> encColRDD;
     if (colMultReduceByKey)
     {
-      encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions);
+      encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(bVars), numColMultPartitions);
     }
     else
     {
-      encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(accum, bVars));
+      encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(bVars));
     }
 
     // Update the output name, by batch number
     bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue());
 
     // Form and write the response object
-    encColRDD.repartition(1).foreachRDD(new VoidFunction<JavaPairRDD<Long,BigInteger>>()
-    {
-      @Override
-      public void call(JavaPairRDD<Long,BigInteger> rdd)
-      {
-        rdd.foreachPartition(new FinalResponseFunction(accum, bVars));
-
-        int maxBatchesVar = bVars.getMaxBatches();
-        if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar)
-        {
-          logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down");
-          System.exit(0);
-        }
+    encColRDD.repartition(1).foreachRDD((VoidFunction<JavaPairRDD<Long, BigInteger>>) rdd -> {
+      rdd.foreachPartition(new FinalResponseFunction(accum, bVars));
 
+      int maxBatchesVar = bVars.getMaxBatches();
+      if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar)
+      {
+        logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down");
+        System.exit(0);
       }
+
     });
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
index 8831e4e..90375aa 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
@@ -19,6 +19,10 @@
 
 package org.apache.pirk.responder.wideskies.storm;
 
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -28,10 +32,6 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.slf4j.LoggerFactory;
 
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * Bolt class to perform encrypted column multiplication
  * <p>
@@ -60,9 +60,7 @@ public class EncColMultBolt extends BaseRichBolt
   private Long totalFlushSignals;
 
   // This is the main object here. It holds column Id -> aggregated product
-  private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
-  private BigInteger colVal1;
-  private BigInteger colMult;
+  private Map<Long,BigInteger> resultsMap = new HashMap<>();
 
   @Override
   public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
@@ -93,7 +91,7 @@ public class EncColMultBolt extends BaseRichBolt
         resultsMap.clear();
 
         // Send signal to OutputBolt to write output and notify EncRowCalcBolt that results have been flushed.
-        outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(new Long(-1), BigInteger.valueOf(0)));
+        outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(-1L, BigInteger.ZERO));
         outputCollector.emit(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Values(1));
         numFlushSignals = 0;
       }
@@ -103,13 +101,13 @@ public class EncColMultBolt extends BaseRichBolt
       // Data tuple received. Do column multiplication.
 
       long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ERC_FIELD);
-      colVal1 = (BigInteger) tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD);
+      BigInteger colVal1 = (BigInteger) tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD);
 
       logger.debug("Received tuple in ECM, multiplying {} to col {}", colVal1, colIndex);
 
       if (resultsMap.containsKey(colIndex))
       {
-        colMult = colVal1.multiply(resultsMap.get(colIndex));
+        BigInteger colMult = colVal1.multiply(resultsMap.get(colIndex));
         resultsMap.put(colIndex, colMult.mod(nSquared));
       }
       else

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
index 324fbf1..8a5b854 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
@@ -19,8 +19,17 @@
 
 package org.apache.pirk.responder.wideskies.storm;
 
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 import org.apache.pirk.query.wideskies.Query;
 import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
+
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -29,15 +38,8 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
+import scala.Tuple2;
 
 /**
  * Bolt class to perform the encrypted row calculation
@@ -69,8 +71,8 @@ public class EncRowCalcBolt extends BaseRichBolt
   private Random rand;
 
   // These are the main data structures used here.
-  private Map<Integer,Integer> hitsByRow = new HashMap<Integer,Integer>();
-  private Map<Integer,Integer> colIndexByRow = new HashMap<Integer,Integer>();
+  private Map<Integer,Integer> hitsByRow = new HashMap<>();
+  private Map<Integer,Integer> colIndexByRow = new HashMap<>();
   private List<Tuple2<Long,BigInteger>> matrixElements = new ArrayList<>();
   private List<BigInteger> dataArray = new ArrayList<>();
 
@@ -164,8 +166,8 @@ public class EncRowCalcBolt extends BaseRichBolt
    * Extracts (hash, data partitions) from tuple. Encrypts the data partitions. Returns all of the pairs of (col index, col value). Also advances the
    * colIndexByRow and hitsByRow appropriately.
    *
-   * @param tuple
-   * @return
+   * @param tuple {@code Tuple}
+   * @return {@code List<Tuple2>}
    */
   private List<Tuple2<Long,BigInteger>> processTupleFromPartitionDataBolt(Tuple tuple)
   {

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
index 68b02f3..17cbee3 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
@@ -19,6 +19,18 @@
 
 package org.apache.pirk.responder.wideskies.storm;
 
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,17 +45,6 @@ import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Tuple;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.net.URI;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
 /**
  * Bolt to compute and output the final Response object for a query
  * <p>
@@ -65,11 +66,9 @@ public class OutputBolt extends BaseRichBolt
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(OutputBolt.class);
 
   private OutputCollector outputCollector;
-  private QueryInfo queryInfo;
   private Response response;
   private String outputFile;
   private boolean hdfs;
-  private String hdfsUri;
   private Integer flushCounter = 0;
   private List<Tuple> tuplesToAck = new ArrayList<>();
   private Integer totalFlushSigs;
@@ -81,10 +80,7 @@ public class OutputBolt extends BaseRichBolt
   public static CountDownLatch latch = new CountDownLatch(4);
 
   // This is the main object here. It holds column Id -> product
-  private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
-
-  private BigInteger colVal;
-  private BigInteger colMult;
+  private Map<Long,BigInteger> resultsMap = new HashMap<>();
 
   private BigInteger nSquared;
 
@@ -100,7 +96,7 @@ public class OutputBolt extends BaseRichBolt
 
     if (hdfs)
     {
-      hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
+      String hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
       try
       {
         FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
@@ -116,7 +112,7 @@ public class OutputBolt extends BaseRichBolt
       localStore = new LocalFileSystemStore();
     }
     nSquared = new BigInteger((String) map.get(StormConstants.N_SQUARED_KEY));
-    queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+    QueryInfo queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
     response = new Response(queryInfo);
 
     logger.info("Intitialized OutputBolt.");
@@ -126,7 +122,7 @@ public class OutputBolt extends BaseRichBolt
   public void execute(Tuple tuple)
   {
     long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ECM_FIELD);
-    colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD);
+    BigInteger colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD);
 
     // colIndex == -1 is just the signal sent by EncColMultBolt to notify that it flushed it's values.
     // Could have created a new stream for such signals, but that seemed like overkill.
@@ -137,12 +133,12 @@ public class OutputBolt extends BaseRichBolt
       logger.debug("Received " + flushCounter + " output flush signals out of " + totalFlushSigs);
 
       // Wait till all EncColMultBolts have been flushed
-      if (flushCounter == totalFlushSigs)
+      if (Objects.equals(flushCounter, totalFlushSigs))
       {
         logger.info("TimeToFlush reached - outputting response to " + outputFile + " with columns.size = " + resultsMap.keySet().size());
         try
         {
-          String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())).toString();
+          String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date()));
           for (long cv : resultsMap.keySet())
           {
             response.addElement((int) cv, resultsMap.get(cv));
@@ -182,7 +178,7 @@ public class OutputBolt extends BaseRichBolt
       // in which case a small number of multiplications still need to be done per column.
       if (resultsMap.containsKey(colIndex))
       {
-        colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared);
+        BigInteger colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared);
         resultsMap.put(colIndex, colMult);
       }
       else

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
index 9d24620..bfa916f 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
@@ -20,15 +20,11 @@
 package org.apache.pirk.responder.wideskies.storm;
 
 import java.math.BigInteger;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.pirk.query.wideskies.QueryInfo;
 import org.apache.pirk.query.wideskies.QueryUtils;
-
-import org.apache.pirk.schema.data.DataSchema;
-import org.apache.pirk.schema.data.DataSchemaRegistry;
 import org.apache.pirk.schema.query.QuerySchema;
 import org.apache.pirk.schema.query.QuerySchemaRegistry;
 import org.apache.storm.task.TopologyContext;
@@ -54,8 +50,6 @@ public class PartitionDataBolt extends BaseBasicBolt
 
   private static final long serialVersionUID = 1L;
 
-  private QueryInfo queryInfo;
-  private String queryType;
   private QuerySchema qSchema = null;
 
   private boolean embedSelector;
@@ -63,13 +57,12 @@ public class PartitionDataBolt extends BaseBasicBolt
   private boolean splitPartitions;
 
   private JSONObject json;
-  private List<BigInteger> partitions;
 
   @Override
   public void prepare(Map map, TopologyContext context)
   {
-    queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
-    queryType = queryInfo.getQueryType();
+    QueryInfo queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+    String queryType = queryInfo.getQueryType();
     embedSelector = queryInfo.getEmbedSelector();
     logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS));
     StormUtils.initializeSchemas(map, "partition");
@@ -102,7 +95,7 @@ public class PartitionDataBolt extends BaseBasicBolt
 
     try
     {
-      partitions = QueryUtils.partitionDataElement(qSchema, json, embedSelector);
+      List<BigInteger> partitions = QueryUtils.partitionDataElement(qSchema, json, embedSelector);
 
       logger.debug("HashSelectorsAndPartitionDataBolt processing {} outputting results - {}", json.toString(), partitions.size());
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
index 76bb80c..50d00c8 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
@@ -18,28 +18,25 @@
  */
 package org.apache.pirk.responder.wideskies.storm;
 
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.pirk.query.wideskies.QueryInfo;
 import org.apache.pirk.query.wideskies.QueryUtils;
 import org.apache.pirk.schema.query.QuerySchema;
 import org.apache.pirk.schema.query.QuerySchemaRegistry;
 import org.apache.pirk.utils.KeyedHash;
-
 import org.apache.storm.Config;
 import org.apache.storm.kafka.StringScheme;
 import org.apache.storm.spout.Scheme;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Scheme used by spout to retrieve and hash selector from JSON data on Kafka.
  */
@@ -51,7 +48,6 @@ public class PirkHashScheme extends StringScheme implements Scheme
   private QueryInfo queryInfo;
 
   transient private JSONParser parser;
-  transient private JSONObject json;
   private boolean initialized = false;
   private QuerySchema qSchema;
   private Config conf;
@@ -81,8 +77,9 @@ public class PirkHashScheme extends StringScheme implements Scheme
 
       initialized = true;
     }
-    String str = super.deserializeString(bytes);
+    String str = deserializeString(bytes);
 
+    JSONObject json;
     try
     {
       json = (JSONObject) parser.parse(str);

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
index 6540ecc..e0f83d3 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
@@ -147,11 +147,11 @@ public class PirkTopology
     conf.setDebug(false);
     // conf.setNumEventLoggers(2);
 
-    conf.put(conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.receiveBufferSize", "1024")));
-    conf.put(conf.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.sendBufferSize", "1024")));
-    conf.put(conf.TOPOLOGY_TRANSFER_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.transferBufferSize", "32")));
-    conf.put(conf.WORKER_HEAP_MEMORY_MB, Integer.parseInt(SystemConfiguration.getProperty("storm.worker.heapMemory", "750")));
-    conf.put(conf.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128")));
+    conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.executor.receiveBufferSize", 1024));
+    conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.executor.sendBufferSize", 1024));
+    conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.transferBufferSize", 32));
+    conf.put(Config.WORKER_HEAP_MEMORY_MB, SystemConfiguration.getIntProperty("storm.worker.heapMemory", 750));
+    conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128")));
 
     // Pirk parameters to send to bolts
     conf.put(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY, SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true"));

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
index 7f1e59d..6ef6bc9 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
@@ -21,49 +21,49 @@ package org.apache.pirk.responder.wideskies.storm;
 public class StormConstants
 {
   // Topology Components
-  public static final String SPOUT_ID = "kafkaspout";
-  public static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt";
-  public static final String ENCROWCALCBOLT_ID = "encrowcalcbolt";
-  public static final String ENCCOLMULTBOLT_ID = "enccolmultbolt";
-  public static final String OUTPUTBOLT_ID = "outputbolt";
+  static final String SPOUT_ID = "kafkaspout";
+  static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt";
+  static final String ENCROWCALCBOLT_ID = "encrowcalcbolt";
+  static final String ENCCOLMULTBOLT_ID = "enccolmultbolt";
+  static final String OUTPUTBOLT_ID = "outputbolt";
 
   // Extra Streams
-  public static final String DEFAULT = "default";
-  public static final String ENCROWCALCBOLT_DATASTREAM_ID = "encrowcalcbolt_datastream_id";
-  public static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush";
-  public static final String ENCCOLMULTBOLT_SESSION_END = "enccolmultbolt_sess_end";
+  static final String DEFAULT = "default";
+  static final String ENCROWCALCBOLT_DATASTREAM_ID = "encrowcalcbolt_datastream_id";
+  static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush";
+  static final String ENCCOLMULTBOLT_SESSION_END = "enccolmultbolt_sess_end";
 
   // Tuple Fields
   // From HashBolt (and variants)
-  public static final String HASH_FIELD = "hash";
-  public static final String PARTIONED_DATA_FIELD = "parData";
-  public static final String JSON_DATA_FIELD = "data";
+  static final String HASH_FIELD = "hash";
+  static final String PARTIONED_DATA_FIELD = "parData";
+  static final String JSON_DATA_FIELD = "data";
   // From EncRowCalcBolt
-  public static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc";
-  public static final String ENCRYPTED_VALUE_FIELD = "encRowValue";
+  static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc";
+  static final String ENCRYPTED_VALUE_FIELD = "encRowValue";
   // From EncColMultBolt
-  public static final String COLUMN_INDEX_ECM_FIELD = "colIndex";
-  public static final String COLUMN_PRODUCT_FIELD = "colProduct";
+  static final String COLUMN_INDEX_ECM_FIELD = "colIndex";
+  static final String COLUMN_PRODUCT_FIELD = "colProduct";
 
   // Configuration Keys
   public static final String USE_HDFS = "useHdfs";
-  public static final String HDFS_URI_KEY = "hdfsUri";
-  public static final String QUERY_FILE_KEY = "queryFile";
+  static final String HDFS_URI_KEY = "hdfsUri";
+  static final String QUERY_FILE_KEY = "queryFile";
   public static final String QUERY_INFO_KEY = "queryInfo";
-  public static final String ALLOW_ADHOC_QSCHEMAS_KEY = "allowAdHocQuerySchemas";
-  public static final String QSCHEMA_KEY = "qSchema";
-  public static final String DSCHEMA_KEY = "dschema";
+  static final String ALLOW_ADHOC_QSCHEMAS_KEY = "allowAdHocQuerySchemas";
+  static final String QSCHEMA_KEY = "qSchema";
+  static final String DSCHEMA_KEY = "dschema";
   public static final String OUTPUT_FILE_KEY = "output";
-  public static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector";
-  public static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector";
-  public static final String SALT_COLUMNS_KEY = "saltColumns";
-  public static final String ROW_DIVISIONS_KEY = "rowDivisions";
-  public static final String SPLIT_PARTITIONS_KEY = "splitPartitions";
+  static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector";
+  static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector";
+  static final String SALT_COLUMNS_KEY = "saltColumns";
+  static final String ROW_DIVISIONS_KEY = "rowDivisions";
+  static final String SPLIT_PARTITIONS_KEY = "splitPartitions";
   public static final String N_SQUARED_KEY = "nSquared";
-  public static final String ENCROWCALCBOLT_PARALLELISM_KEY = "encrowcalcboltPar";
-  public static final String ENCCOLMULTBOLT_PARALLELISM_KEY = "enccolmultboltPar";
+  static final String ENCROWCALCBOLT_PARALLELISM_KEY = "encrowcalcboltPar";
+  static final String ENCCOLMULTBOLT_PARALLELISM_KEY = "enccolmultboltPar";
 
-  public static final String SALT = "salt";
-  public static final String FLUSH = "flush";
+  static final String SALT = "salt";
+  static final String FLUSH = "flush";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
index 7fbca66..bbffaba 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pirk.responder.wideskies.storm;
 
+import java.net.URI;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.pirk.query.wideskies.Query;
@@ -30,9 +33,6 @@ import org.apache.storm.Constants;
 import org.apache.storm.tuple.Tuple;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Map;
-
 /**
  * Utils class for the Storm implementation of Wideskies
  */
@@ -43,14 +43,14 @@ public class StormUtils
   /**
    * Method to read in serialized Query object from the given queryFile
    * 
-   * @param useHdfs
-   * @param hdfsUri
-   * @param queryFile
-   * @return
+   * @param useHdfs - true or false
+   * @param hdfsUri - HDFS path
+   * @param queryFile -
+   * @return {@link Query}
    */
   public static Query getQuery(boolean useHdfs, String hdfsUri, String queryFile)
   {
-    Query query = null;
+    Query query;
 
     try
     {
@@ -77,7 +77,7 @@ public class StormUtils
    * Method to read in and return a serialized Query object from the given file and initialize/load the query.schemas and data.schemas
    * 
    * @param map
-   * @return
+   * @return {@link Query}
    */
   public static Query prepareQuery(Map map)
   {

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index c651eaa..4bac8b7 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -31,9 +31,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.pirk.schema.data.partitioner.DataPartitioner;
 import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
 import org.apache.pirk.utils.PIRException;

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/test/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/TestUtils.java b/src/main/java/org/apache/pirk/test/utils/TestUtils.java
index be01fb4..05c9f28 100644
--- a/src/main/java/org/apache/pirk/test/utils/TestUtils.java
+++ b/src/main/java/org/apache/pirk/test/utils/TestUtils.java
@@ -21,7 +21,6 @@ package org.apache.pirk.test.utils;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -269,10 +268,9 @@ public class TestUtils
   /**
    * Converts the result file into an ArrayList of QueryResponseJSON objects
    * 
-   * @throws IOException
-   * @throws FileNotFoundException
+   * @throws IOException - {@link IOException}
    */
-  public static List<QueryResponseJSON> readResultsFile(File file) throws FileNotFoundException, IOException
+  public static List<QueryResponseJSON> readResultsFile(File file) throws IOException
   {
     List<QueryResponseJSON> results = new ArrayList<>();
     try (BufferedReader br = new BufferedReader(new FileReader(file)))
@@ -290,7 +288,7 @@ public class TestUtils
 
   /**
    * Write the ArrayList<String to a tmp file in the local filesystem with the given fileName
-   * 
+   * @throws IOException - {@link IOException}
    */
   public static String writeToTmpFile(List<String> list, String fileName, String suffix) throws IOException
   {

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
index fc4e703..425bb0b 100644
--- a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
+++ b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
@@ -18,6 +18,15 @@
  */
 package org.apache.pirk.storm;
 
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import kafka.admin.AdminUtils;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
@@ -25,8 +34,8 @@ import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
 
 import org.I0Itec.zkclient.ZkClient;
-
 import org.I0Itec.zkclient.ZkConnection;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -38,7 +47,10 @@ import org.apache.pirk.querier.wideskies.QuerierConst;
 import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
 import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
 import org.apache.pirk.query.wideskies.QueryInfo;
-import org.apache.pirk.responder.wideskies.storm.*;
+import org.apache.pirk.responder.wideskies.storm.OutputBolt;
+import org.apache.pirk.responder.wideskies.storm.PirkHashScheme;
+import org.apache.pirk.responder.wideskies.storm.PirkTopology;
+import org.apache.pirk.responder.wideskies.storm.StormConstants;
 import org.apache.pirk.response.wideskies.Response;
 import org.apache.pirk.schema.query.filter.StopListFilter;
 import org.apache.pirk.schema.response.QueryResponseJSON;
@@ -61,18 +73,12 @@ import org.apache.storm.testing.TestJob;
 import org.json.simple.JSONObject;
 
 import org.junit.AfterClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Properties;
-import java.util.HashMap;
-import java.util.Arrays;
-import java.util.ArrayList;
-
 @Category(IntegrationTest.class)
 public class KafkaStormIntegrationTest
 {
@@ -87,6 +93,9 @@ public class KafkaStormIntegrationTest
   private static final String topic = "pirk_test_topic";
   private static final String kafkaTmpDir = "/tmp/kafka";
 
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
   private static File fileQuery;
   private static File fileQuerier;
 
@@ -122,7 +131,7 @@ public class KafkaStormIntegrationTest
     performEncryption();
     SystemConfiguration.setProperty("pir.queryInput", fileQuery.getAbsolutePath());
 
-    KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig());
+    KafkaProducer<String,String> producer = new KafkaProducer<>(createKafkaProducerConfig());
     loadTestData(producer);
 
     logger.info("Test (splitPartitions,saltColumns) = (true,true)");
@@ -260,17 +269,13 @@ public class KafkaStormIntegrationTest
     zookeeperLocalCluster.stop();
 
     FileUtils.deleteDirectory(new File(kafkaTmpDir));
-
-    fileQuery.delete();
-    fileQuerier.delete();
-
   }
 
-  private HashMap<String,Object> createKafkaProducerConfig()
+  private Map<String,Object> createKafkaProducerConfig()
   {
     String kafkaHostName = "localhost";
-    Integer kafkaPorts = 11111;
-    HashMap<String,Object> config = new HashMap<String,Object>();
+    int kafkaPorts = 11111;
+    Map<String,Object> config = new HashMap<>();
     config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostName + ":" + kafkaPorts);
     config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
@@ -278,18 +283,17 @@ public class KafkaStormIntegrationTest
     return config;
   }
 
-  private void loadTestData(KafkaProducer producer)
+  private void loadTestData(KafkaProducer<String,String> producer)
   {
     for (JSONObject dataRecord : Inputs.createJSONDataElements())
     {
       logger.info("Sending record to Kafka " + dataRecord.toString());
-      producer.send(new ProducerRecord<String,String>(topic, dataRecord.toString()));
+      producer.send(new ProducerRecord<>(topic, dataRecord.toString()));
     }
   }
 
   private void performEncryption() throws Exception
   {
-    // ArrayList<String> selectors = BaseTests.selectorsDomain;
     List<String> selectors = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net"));
     String queryType = Inputs.DNS_HOSTNAME_QUERY;
 
@@ -307,8 +311,8 @@ public class KafkaStormIntegrationTest
     logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:");
 
     // Write out files.
-    fileQuerier = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG, ".txt");
-    fileQuery = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG, ".txt");
+    fileQuerier = folder.newFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG + ".txt");
+    fileQuery = folder.newFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG + ".txt");
 
     localStore.store(fileQuerier.getAbsolutePath(), querier);
     localStore.store(fileQuery, querier.getQuery());