You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pirk.apache.org by ea...@apache.org on 2016/07/23 15:29:15 UTC
[3/4] incubator-pirk git commit: PIRK-11 Switch to SLF4J with Log4j2
bindings, including other minor cleanup - closes apache/incubator-pirk#20
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
index 2f1e069..7eb264a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,23 +15,22 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.common;
import java.math.BigInteger;
import java.util.ArrayList;
import org.apache.hadoop.io.MapWritable;
-import org.apache.log4j.Logger;
import org.apache.pirk.inputformat.hadoop.BytesArrayWritable;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.query.wideskies.QueryUtils;
import org.apache.pirk.schema.data.DataSchema;
import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.utils.KeyedHash;
-import org.apache.pirk.utils.LogUtils;
import org.json.simple.JSONObject;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -40,12 +39,12 @@ import scala.Tuple2;
*/
public class HashSelectorAndPartitionData
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(HashSelectorAndPartitionData.class);
public static Tuple2<Integer,ArrayList<BigInteger>> hashSelectorAndFormPartitionsBigInteger(MapWritable dataElement, QuerySchema qSchema, DataSchema dSchema,
QueryInfo queryInfo) throws Exception
{
- Tuple2<Integer,ArrayList<BigInteger>> returnTuple = null;
+ Tuple2<Integer,ArrayList<BigInteger>> returnTuple;
// Pull the selector based on the query type
String selector = QueryUtils.getSelectorByQueryType(dataElement, qSchema, dSchema);
@@ -56,7 +55,7 @@ public class HashSelectorAndPartitionData
// Partition by the given partitionSize
ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector());
- returnTuple = new Tuple2<Integer,ArrayList<BigInteger>>(hash, hitValPartitions);
+ returnTuple = new Tuple2<>(hash, hitValPartitions);
return returnTuple;
}
@@ -64,7 +63,7 @@ public class HashSelectorAndPartitionData
public static Tuple2<Integer,BytesArrayWritable> hashSelectorAndFormPartitions(MapWritable dataElement, QuerySchema qSchema, DataSchema dSchema,
QueryInfo queryInfo) throws Exception
{
- Tuple2<Integer,BytesArrayWritable> returnTuple = null;
+ Tuple2<Integer,BytesArrayWritable> returnTuple;
// Pull the selector based on the query type
String selector = QueryUtils.getSelectorByQueryType(dataElement, qSchema, dSchema);
@@ -76,14 +75,14 @@ public class HashSelectorAndPartitionData
ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector());
BytesArrayWritable bAW = new BytesArrayWritable(hitValPartitions);
- returnTuple = new Tuple2<Integer,BytesArrayWritable>(hash, bAW);
+ returnTuple = new Tuple2<>(hash, bAW);
return returnTuple;
}
public static Tuple2<Integer,ArrayList<BigInteger>> hashSelectorAndFormPartitions(JSONObject json, QueryInfo queryInfo) throws Exception
{
- Tuple2<Integer,ArrayList<BigInteger>> returnTuple = null;
+ Tuple2<Integer,ArrayList<BigInteger>> returnTuple;
// Pull the selector based on the query type
String selector = QueryUtils.getSelectorByQueryTypeJSON(queryInfo.getQueryType(), json);
@@ -94,7 +93,7 @@ public class HashSelectorAndPartitionData
// Partition by the given partitionSize
ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(queryInfo.getQueryType(), json, queryInfo.getEmbedSelector());
- returnTuple = new Tuple2<Integer,ArrayList<BigInteger>>(hash, hitValPartitions);
+ returnTuple = new Tuple2<>(hash, hitValPartitions);
return returnTuple;
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java
index 5aa3ffe..92993ff 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultMapper.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.mapreduce;
import java.io.IOException;
@@ -23,9 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
import org.apache.pirk.utils.CSVOutputUtils;
-import org.apache.pirk.utils.LogUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Pass through mapper for encrypted column multiplication
@@ -33,10 +33,10 @@ import org.apache.pirk.utils.LogUtils;
*/
public class ColumnMultMapper extends Mapper<LongWritable,Text,LongWritable,Text>
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(ColumnMultMapper.class);
- LongWritable keyOut = null;
- Text valueOut = null;
+ private LongWritable keyOut = null;
+ private Text valueOut = null;
@Override
public void setup(Context ctx) throws IOException, InterruptedException
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
index cfca83c..abffadf 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.mapreduce;
import java.io.IOException;
@@ -27,10 +27,10 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.utils.FileConst;
-import org.apache.pirk.utils.LogUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Reducer to perform encrypted column multiplication
@@ -38,12 +38,12 @@ import org.apache.pirk.utils.LogUtils;
*/
public class ColumnMultReducer extends Reducer<LongWritable,Text,LongWritable,Text>
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(ColumnMultReducer.class);
- Text outputValue = null;
+ private Text outputValue = null;
private MultipleOutputs<LongWritable,Text> mos = null;
- Query query = null;
+ private Query query = null;
@Override
public void setup(Context ctx) throws IOException, InterruptedException
@@ -51,7 +51,7 @@ public class ColumnMultReducer extends Reducer<LongWritable,Text,LongWritable,Te
super.setup(ctx);
outputValue = new Text();
- mos = new MultipleOutputs<LongWritable,Text>(ctx);
+ mos = new MultipleOutputs<>(ctx);
FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
index 827f0b1..fb3027b 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.mapreduce;
import java.io.BufferedReader;
@@ -41,7 +41,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
-import org.apache.log4j.Logger;
import org.apache.pirk.inputformat.hadoop.BaseInputFormat;
import org.apache.pirk.inputformat.hadoop.BytesArrayWritable;
import org.apache.pirk.inputformat.hadoop.InputFormatConst;
@@ -52,9 +51,10 @@ import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.utils.FileConst;
import org.apache.pirk.utils.HDFS;
-import org.apache.pirk.utils.LogUtils;
import org.apache.pirk.utils.SystemConfiguration;
import org.elasticsearch.hadoop.mr.EsInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tool for computing the PIR response in MapReduce
@@ -62,14 +62,14 @@ import org.elasticsearch.hadoop.mr.EsInputFormat;
* Each query run consists of three MR jobs:
* <p>
* (1) Map: Initialization mapper reads data using an extension of the BaseInputFormat or elasticsearch and, according to the QueryInfo object, extracts the
- * selector from each dataElement according to the QueryType, hashes selector, and outputs {@code<hash(selector), dataElement>}
+ * selector from each dataElement according to the QueryType, hashes selector, and outputs {@link <hash(selector), dataElement>}
* <p>
* Reduce: Calculates the encrypted row values for each selector and corresponding data element, striping across columns,and outputs each row entry by column
- * position: {@code<colNum, colVal>}
+ * position: {@link <colNum, colVal>}
* <p>
* (2) Map: Pass through mapper to aggregate by column number
* <p>
- * Reduce: Input: {@code<colnum, <colVals>>}; multiplies all colVals according to the encryption algorithm and outputs {@code<colNum, colVal>} for each colNum
+ * Reduce: Input: {@link <colnum, <colVals>>}; multiplies all colVals according to the encryption algorithm and outputs {@link <colNum, colVal>} for each colNum
* <p>
* (3) Map: Pass through mapper to move all final columns to one reducer
* <p>
@@ -89,32 +89,32 @@ import org.elasticsearch.hadoop.mr.EsInputFormat;
*/
public class ComputeResponseTool extends Configured implements Tool
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(ComputeResponseTool.class);
- String dataInputFormat = null;
- String inputFile = null;
- String outputFile = null;
- String outputDirExp = null;
- String outputDirInit = null;
- String outputDirColumnMult = null;
- String outputDirFinal = null;
- String queryInputDir = null;
- String stopListFile = null;
- int numReduceTasks = 1;
+ private String dataInputFormat = null;
+ private String inputFile = null;
+ private String outputFile = null;
+ private String outputDirExp = null;
+ private String outputDirInit = null;
+ private String outputDirColumnMult = null;
+ private String outputDirFinal = null;
+ private String queryInputDir = null;
+ private String stopListFile = null;
+ private int numReduceTasks = 1;
- boolean useHDFSLookupTable = false;
+ private boolean useHDFSLookupTable = false;
- String esQuery = "none";
- String esResource = "none";
+ private String esQuery = "none";
+ private String esResource = "none";
String dataSchema = "none";
- Configuration conf = null;
- FileSystem fs = null;
+ private Configuration conf = null;
+ private FileSystem fs = null;
- Query query = null;
- QueryInfo queryInfo = null;
- QuerySchema qSchema = null;
+ private Query query = null;
+ private QueryInfo queryInfo = null;
+ private QuerySchema qSchema = null;
public ComputeResponseTool() throws Exception
{
@@ -223,7 +223,7 @@ public class ComputeResponseTool extends Configured implements Tool
private boolean computeExpTable() throws IOException, ClassNotFoundException, InterruptedException
{
- boolean success = false;
+ boolean success;
logger.info("Creating expTable");
@@ -235,7 +235,7 @@ public class ComputeResponseTool extends Configured implements Tool
}
// Write the query hashes to the split files
TreeMap<Integer,BigInteger> queryElements = query.getQueryElements();
- ArrayList<Integer> keys = new ArrayList<Integer>(queryElements.keySet());
+ ArrayList<Integer> keys = new ArrayList<>(queryElements.keySet());
int numSplits = Integer.parseInt(SystemConfiguration.getProperty("pir.expCreationSplits", "100"));
int elementsPerSplit = (int) Math.floor(queryElements.size() / numSplits);
@@ -302,13 +302,13 @@ public class ComputeResponseTool extends Configured implements Tool
// Assemble the exp table from the output
// element_index -> fileName
- HashMap<Integer,String> expFileTable = new HashMap<Integer,String>();
+ HashMap<Integer,String> expFileTable = new HashMap<>();
FileStatus[] status = fs.listStatus(outPathExp);
for (FileStatus fstat : status)
{
- if (fstat.getPath().getName().toString().startsWith(FileConst.PIR))
+ if (fstat.getPath().getName().startsWith(FileConst.PIR))
{
- logger.info("fstat.getPath().getName().toString() = " + fstat.getPath().getName().toString());
+ logger.info("fstat.getPath().getName().toString() = " + fstat.getPath().getName());
try
{
InputStreamReader isr = new InputStreamReader(fs.open(fstat.getPath()));
@@ -340,7 +340,7 @@ public class ComputeResponseTool extends Configured implements Tool
private boolean readDataEncRows(Path outPathInit) throws Exception
{
- boolean success = false;
+ boolean success;
Job job = new Job(conf, "pirMR");
job.setSpeculativeExecution(false);
@@ -432,7 +432,7 @@ public class ComputeResponseTool extends Configured implements Tool
private boolean multiplyColumns(Path outPathInit, Path outPathColumnMult) throws IOException, ClassNotFoundException, InterruptedException
{
- boolean success = false;
+ boolean success;
Job columnMultJob = new Job(conf, "pir_columnMult");
columnMultJob.setSpeculativeExecution(false);
@@ -460,7 +460,7 @@ public class ComputeResponseTool extends Configured implements Tool
FileStatus[] status = fs.listStatus(outPathInit);
for (FileStatus fstat : status)
{
- if (fstat.getPath().getName().toString().startsWith(FileConst.PIR))
+ if (fstat.getPath().getName().startsWith(FileConst.PIR))
{
logger.info("fstat.getPath() = " + fstat.getPath().toString());
FileInputFormat.addInputPath(columnMultJob, fstat.getPath());
@@ -492,7 +492,7 @@ public class ComputeResponseTool extends Configured implements Tool
private boolean computeFinalResponse(Path outPathFinal) throws ClassNotFoundException, IOException, InterruptedException
{
- boolean success = false;
+ boolean success;
Job finalResponseJob = new Job(conf, "pir_finalResponse");
finalResponseJob.setSpeculativeExecution(false);
@@ -522,7 +522,7 @@ public class ComputeResponseTool extends Configured implements Tool
FileStatus[] status = fs.listStatus(new Path(outputDirColumnMult));
for (FileStatus fstat : status)
{
- if (fstat.getPath().getName().toString().startsWith(FileConst.PIR_COLS))
+ if (fstat.getPath().getName().startsWith(FileConst.PIR_COLS))
{
logger.info("fstat.getPath() = " + fstat.getPath().toString());
FileInputFormat.addInputPath(finalResponseJob, fstat.getPath());
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
index 4b0a3b3..28d49a3 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.mapreduce;
import java.io.IOException;
@@ -26,10 +26,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
import org.apache.pirk.encryption.ModPowAbstraction;
import org.apache.pirk.query.wideskies.Query;
-import org.apache.pirk.utils.LogUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class to generate the expTable given the input query vectors
@@ -37,15 +37,13 @@ import org.apache.pirk.utils.LogUtils;
*/
public class ExpTableMapper extends Mapper<LongWritable,Text,Text,Text>
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(ExpTableMapper.class);
- Text keyOut = null;
- Text valueOut = null;
+ private Text valueOut = null;
- int dataPartitionBitSize = 0;
- int maxValue = 0;
- BigInteger NSquared = null;
- Query query = null;
+ private int maxValue = 0;
+ private BigInteger NSquared = null;
+ private Query query = null;
@Override
public void setup(Context ctx) throws IOException, InterruptedException
@@ -58,7 +56,7 @@ public class ExpTableMapper extends Mapper<LongWritable,Text,Text,Text>
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
query = Query.readFromHDFSFile(new Path(queryDir), fs);
- dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize();
+ int dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize();
maxValue = (int) Math.pow(2, dataPartitionBitSize) - 1;
NSquared = query.getNSquared();
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java
index 6bbd89b..fabf679 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableReducer.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.mapreduce;
import java.io.IOException;
@@ -23,9 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-import org.apache.log4j.Logger;
import org.apache.pirk.utils.FileConst;
-import org.apache.pirk.utils.LogUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Reducer class to complete the exp lookup table and add to the Query object
@@ -33,16 +33,16 @@ import org.apache.pirk.utils.LogUtils;
*/
public class ExpTableReducer extends Reducer<Text,Text,Text,Text>
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(ExpTableReducer.class);
private MultipleOutputs<Text,Text> mos = null;
- String reducerID = null;
+ private String reducerID = null;
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
super.setup(ctx);
- mos = new MultipleOutputs<Text,Text>(ctx);
+ mos = new MultipleOutputs<>(ctx);
reducerID = String.format("%05d", ctx.getTaskAttemptID().getTaskID().getId());
logger.info("reducerID = " + reducerID);
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
index f9a0881..1df7b0e 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.mapreduce;
import java.io.IOException;
@@ -27,11 +27,11 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.response.wideskies.Response;
-import org.apache.pirk.utils.LogUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Reducer class to construct the final Response object
@@ -39,28 +39,26 @@ import org.apache.pirk.utils.LogUtils;
*/
public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable,Text>
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(FinalResponseReducer.class);
- Text outputValue = null;
private MultipleOutputs<LongWritable,Text> mos = null;
- Response response = null;
- String outputFile = null;
- FileSystem fs = null;
- QueryInfo queryInfo = null;
+ private Response response = null;
+ private String outputFile = null;
+ private FileSystem fs = null;
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
super.setup(ctx);
- outputValue = new Text();
- mos = new MultipleOutputs<LongWritable,Text>(ctx);
+ Text outputValue = new Text();
+ mos = new MultipleOutputs<>(ctx);
fs = FileSystem.newInstance(ctx.getConfiguration());
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
Query query = Query.readFromHDFSFile(new Path(queryDir), fs);
- queryInfo = query.getQueryInfo();
+ QueryInfo queryInfo = query.getQueryInfo();
outputFile = ctx.getConfiguration().get("pirMR.outputFile");
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
index c3f672e..95396a9 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.mapreduce;
import java.io.IOException;
@@ -27,7 +27,6 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
import org.apache.pirk.inputformat.hadoop.BytesArrayWritable;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.query.wideskies.QueryInfo;
@@ -37,32 +36,31 @@ import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.schema.query.filter.DataFilter;
-import org.apache.pirk.utils.LogUtils;
import org.apache.pirk.utils.StringUtils;
import org.apache.pirk.utils.SystemConfiguration;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
* Initialization mapper for PIR
* <p>
* Reads in data, extracts the selector by queryType from each dataElement, performs a keyed hash of the selector, extracts the partitions of the dataElement,
- * and emits {@<hash(selector), dataPartitions>}
+ * and emits {@link <hash(selector), dataPartitions>}
*
*/
public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable,IntWritable,BytesArrayWritable>
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(HashSelectorsAndPartitionDataMapper.class);
- IntWritable keyOut = null;
+ private IntWritable keyOut = null;
HashSet<String> stopList = null;
- Query query = null;
- QueryInfo queryInfo = null;
- QuerySchema qSchema = null;
- DataSchema dSchema = null;
- Object filter = null;
+ private QueryInfo queryInfo = null;
+ private QuerySchema qSchema = null;
+ private DataSchema dSchema = null;
+ private Object filter = null;
@Override
public void setup(Context ctx) throws IOException, InterruptedException
@@ -77,7 +75,7 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable
// Can make this so that it reads multiple queries at one time...
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
- query = Query.readFromHDFSFile(new Path(queryDir), fs);
+ Query query = Query.readFromHDFSFile(new Path(queryDir), fs);
queryInfo = query.getQueryInfo();
try
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java
index fa2d8cf..cce2939 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MRStats.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,13 +15,13 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.mapreduce;
public class MRStats
{
- public static enum Stats
+ public enum Stats
{
- NUM_RECORDS_INIT_MAPPER, NUM_RECORDS_PROCESSED_INIT_MAPPER, NUM_HASHES_REDUCER, NUM_COLUMNS;
+ NUM_RECORDS_INIT_MAPPER, NUM_RECORDS_PROCESSED_INIT_MAPPER, NUM_HASHES_REDUCER, NUM_COLUMNS
}
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
index 72a00e6..e35ee84 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.mapreduce;
import java.io.IOException;
@@ -29,7 +29,6 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-import org.apache.log4j.Logger;
import org.apache.pirk.inputformat.hadoop.BytesArrayWritable;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.query.wideskies.QueryInfo;
@@ -39,9 +38,9 @@ import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.utils.FileConst;
-import org.apache.pirk.utils.LogUtils;
import org.apache.pirk.utils.SystemConfiguration;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -54,22 +53,20 @@ import scala.Tuple2;
*/
public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongWritable,Text>
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(RowCalcReducer.class);
- LongWritable keyOut = null;
- Text valueOut = null;
+ private LongWritable keyOut = null;
+ private Text valueOut = null;
private MultipleOutputs<LongWritable,Text> mos = null;
- FileSystem fs = null;
- Query query = null;
- QueryInfo queryInfo = null;
- QuerySchema qSchema = null;
- DataSchema dSchema = null;
+ private FileSystem fs = null;
+ private Query query = null;
+ private QueryInfo queryInfo = null;
- boolean useLocalCache = false;
- boolean limitHitsPerSelector = false;
- int maxHitsPerSelector = 1000;
+ private boolean useLocalCache = false;
+ private boolean limitHitsPerSelector = false;
+ private int maxHitsPerSelector = 1000;
@Override
public void setup(Context ctx) throws IOException, InterruptedException
@@ -78,7 +75,7 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW
keyOut = new LongWritable();
valueOut = new Text();
- mos = new MultipleOutputs<LongWritable,Text>(ctx);
+ mos = new MultipleOutputs<>(ctx);
fs = FileSystem.newInstance(ctx.getConfiguration());
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
@@ -99,8 +96,8 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW
e.printStackTrace();
}
- qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
- dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName());
+ QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
+ DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName());
if (ctx.getConfiguration().get("pirWL.useLocalCache").equals("true"))
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
index 837c09a..1345fe5 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
@@ -20,10 +20,10 @@ package org.apache.pirk.responder.wideskies.spark;
import java.io.Serializable;
-import org.apache.log4j.Logger;
-import org.apache.pirk.utils.LogUtils;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Accumulators for the Responder
@@ -33,13 +33,13 @@ public class Accumulators implements Serializable
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(Accumulators.class);
- Accumulator<Integer> numRecordsReceived = null;
- Accumulator<Integer> numRecordsFiltered = null;
- Accumulator<Integer> numRecordsAfterFilter = null;
- Accumulator<Integer> numHashes = null;
- Accumulator<Integer> numColumns = null;
+ private Accumulator<Integer> numRecordsReceived = null;
+ private Accumulator<Integer> numRecordsFiltered = null;
+ private Accumulator<Integer> numRecordsAfterFilter = null;
+ private Accumulator<Integer> numHashes = null;
+ private Accumulator<Integer> numColumns = null;
public Accumulators(JavaSparkContext sc)
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
index b4a2bd6..89ce35f 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.io.Serializable;
@@ -33,19 +33,19 @@ public class BroadcastVars implements Serializable
{
private static final long serialVersionUID = 1L;
- transient JavaSparkContext jsc = null;
+ private transient JavaSparkContext jsc = null;
Broadcast<Query> query = null;
- Broadcast<QueryInfo> queryInfo = null;
+ private Broadcast<QueryInfo> queryInfo = null;
- Broadcast<String> useLocalCache = null;
+ private Broadcast<String> useLocalCache = null;
- Broadcast<Boolean> limitHitsPerSelector = null;
+ private Broadcast<Boolean> limitHitsPerSelector = null;
- Broadcast<Integer> maxHitsPerSelector = null;
+ private Broadcast<Integer> maxHitsPerSelector = null;
- Broadcast<String> expDir = null;
+ private Broadcast<String> expDir = null;
public BroadcastVars(JavaSparkContext sc)
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
index 0db43f8..938c32e 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.io.IOException;
@@ -27,14 +27,13 @@ import java.util.TreeMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.Query;
-import org.apache.pirk.utils.LogUtils;
import org.apache.pirk.utils.SystemConfiguration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -43,7 +42,7 @@ import scala.Tuple2;
*/
public class ComputeExpLookupTable
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(ComputeExpLookupTable.class);
/**
* Method to create the distributed modular exponentiation lookup table in hdfs for a given Query
@@ -64,7 +63,7 @@ public class ComputeExpLookupTable
public static JavaPairRDD<Integer,Iterable<Tuple2<Integer,BigInteger>>> computeExpTable(JavaSparkContext sc, FileSystem fs, BroadcastVars bVars, Query query,
String queryInputFile, String outputDirExp, boolean useModExpJoin)
{
- JavaPairRDD<Integer,Iterable<Tuple2<Integer,BigInteger>>> expCalculations = null;
+ JavaPairRDD<Integer,Iterable<Tuple2<Integer,BigInteger>>> expCalculations;
logger.info("Creating expTable in hdfs for queryName = " + query.getQueryInfo().getQueryName());
@@ -83,7 +82,7 @@ public class ComputeExpLookupTable
// Write the query hashes to a RDD
TreeMap<Integer,BigInteger> queryElements = query.getQueryElements();
- ArrayList<Integer> keys = new ArrayList<Integer>(queryElements.keySet());
+ ArrayList<Integer> keys = new ArrayList<>(queryElements.keySet());
int numSplits = Integer.parseInt(SystemConfiguration.getProperty("pir.expCreationSplits", "100"));
JavaRDD<Integer> queryHashes = sc.parallelize(keys, numSplits);
@@ -100,7 +99,7 @@ public class ComputeExpLookupTable
// Place exp table in query object and in the BroadcastVars
Map<Integer,String> queryHashFileNameMap = hashToPartition.collectAsMap();
- query.setExpFileBasedLookup(new HashMap<Integer,String>(queryHashFileNameMap));
+ query.setExpFileBasedLookup(new HashMap<>(queryHashFileNameMap));
query.writeToHDFSFile(new Path(queryInputFile), fs);
bVars.setQuery(query);
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index 5391e41..ba7fd12 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.math.BigInteger;
@@ -28,7 +28,6 @@ import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.log4j.Logger;
import org.apache.pirk.inputformat.hadoop.BaseInputFormat;
import org.apache.pirk.inputformat.hadoop.InputFormatConst;
import org.apache.pirk.query.wideskies.Query;
@@ -37,7 +36,6 @@ import org.apache.pirk.response.wideskies.Response;
import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
-import org.apache.pirk.utils.LogUtils;
import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
import org.apache.spark.SparkConf;
@@ -45,7 +43,8 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.hadoop.mr.EsInputFormat;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -62,35 +61,34 @@ import scala.Tuple2;
*/
public class ComputeResponse
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(ComputeResponse.class);
- String dataInputFormat = null;
- String inputData = null;
- String outputFile = null;
- String outputDirExp = null;
+ private String dataInputFormat = null;
+ private String inputData = null;
+ private String outputFile = null;
+ private String outputDirExp = null;
- String queryInput = null;
- String stopListFile = null;
+ private String queryInput = null;
- String esQuery = "none";
- String esResource = "none";
+ private String esQuery = "none";
+ private String esResource = "none";
- boolean useHDFSLookupTable = false;
- boolean useModExpJoin = false;
+ private boolean useHDFSLookupTable = false;
+ private boolean useModExpJoin = false;
- FileSystem fs = null;
- JavaSparkContext sc = null;
+ private FileSystem fs = null;
+ private JavaSparkContext sc = null;
- Accumulators accum = null;
- BroadcastVars bVars = null;
+ private Accumulators accum = null;
+ private BroadcastVars bVars = null;
- QueryInfo queryInfo = null;
+ private QueryInfo queryInfo = null;
Query query = null;
- int numDataPartitions = 0;
- int numColMultPartitions = 0;
+ private int numDataPartitions = 0;
+ private int numColMultPartitions = 0;
- boolean colMultReduceByKey = false;
+ private boolean colMultReduceByKey = false;
public ComputeResponse(FileSystem fileSys) throws Exception
{
@@ -129,7 +127,7 @@ public class ComputeResponse
outputDirExp = outputFile + "_exp";
queryInput = SystemConfiguration.getProperty("pir.queryInput");
- stopListFile = SystemConfiguration.getProperty("pir.stopListFile");
+ String stopListFile = SystemConfiguration.getProperty("pir.stopListFile");
useModExpJoin = SystemConfiguration.getProperty("pir.useModExpJoin", "false").equals("true");
logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery
@@ -199,7 +197,7 @@ public class ComputeResponse
/**
* Method to read in data from an allowed input source/format and perform the query
*/
- public void performQuery() throws ClassNotFoundException, Exception
+ public void performQuery() throws Exception
{
logger.info("Performing query: ");
@@ -223,7 +221,7 @@ public class ComputeResponse
{
logger.info("Reading data ");
- JavaRDD<MapWritable> dataRDD = null;
+ JavaRDD<MapWritable> dataRDD;
Job job = new Job();
String baseQuery = SystemConfiguration.getProperty("pir.baseQuery");
@@ -271,7 +269,7 @@ public class ComputeResponse
{
logger.info("Reading data ");
- JavaRDD<MapWritable> dataRDD = null;
+ JavaRDD<MapWritable> dataRDD;
Job job = new Job();
String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis();
@@ -314,7 +312,7 @@ public class ComputeResponse
JavaPairRDD<Integer,Iterable<ArrayList<BigInteger>>> selectorGroupRDD = selectorHashToDocRDD.groupByKey();
// Calculate the encrypted row values for each row, emit <colNum, colVal> for each row
- JavaPairRDD<Long,BigInteger> encRowRDD = null;
+ JavaPairRDD<Long,BigInteger> encRowRDD;
if (useModExpJoin)
{
// If we are pre-computing the modular exponentiation table and then joining the data partitions
@@ -347,7 +345,7 @@ public class ComputeResponse
private void encryptedColumnCalc(JavaPairRDD<Long,BigInteger> encRowRDD) throws PIRException
{
// Multiply the column values by colNum: emit <colNum, finalColVal>
- JavaPairRDD<Long,BigInteger> encColRDD = null;
+ JavaPairRDD<Long,BigInteger> encColRDD;
if (colMultReduceByKey)
{
encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
index 84d00b4..72d6b95 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,16 +15,15 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.math.BigInteger;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.Query;
-import org.apache.pirk.utils.LogUtils;
import org.apache.spark.api.java.function.PairFunction;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -35,19 +34,14 @@ public class EncColMultGroupedMapper implements PairFunction<Tuple2<Long,Iterabl
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
-
- Accumulators accum = null;
- BroadcastVars bbVars = null;
+ private static final Logger logger = LoggerFactory.getLogger(EncColMultGroupedMapper.class);
Query query = null;
EncColMultGroupedMapper(Accumulators accumIn, BroadcastVars bbVarsIn)
{
- accum = accumIn;
- bbVars = bbVarsIn;
- query = bbVars.getQuery();
+ query = bbVarsIn.getQuery();
logger.info("Initialized EncColMultReducer");
}
@@ -66,6 +60,6 @@ public class EncColMultGroupedMapper implements PairFunction<Tuple2<Long,Iterabl
// long endTime = System.currentTimeMillis();
// logger.debug("Completed column mult for col = " + colVals._1 + " duration = " + (endTime - startTime));
- return new Tuple2<Long,BigInteger>(colVals._1, colVal);
+ return new Tuple2<>(colVals._1, colVal);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
index f6fe25a..44bce8d 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,15 +15,15 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.math.BigInteger;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.Query;
-import org.apache.pirk.utils.LogUtils;
import org.apache.spark.api.java.function.Function2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Function to perform encrypted column multiplication
@@ -33,19 +33,14 @@ public class EncColMultReducer implements Function2<BigInteger,BigInteger,BigInt
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
-
- Accumulators accum = null;
- BroadcastVars bbVars = null;
+ private static final Logger logger = LoggerFactory.getLogger(EncColMultReducer.class);
Query query = null;
EncColMultReducer(Accumulators accumIn, BroadcastVars bbVarsIn)
{
- accum = accumIn;
- bbVars = bbVarsIn;
- query = bbVars.getQuery();
+ query = bbVarsIn.getQuery();
logger.info("Initialized EncColMultReducer");
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
index af3fd44..2b28c46 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.io.IOException;
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
@@ -32,9 +31,9 @@ import org.apache.pirk.schema.data.DataSchema;
import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
-import org.apache.pirk.utils.LogUtils;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -49,36 +48,32 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<A
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(EncRowCalc.class);
- Accumulators accum = null;
- BroadcastVars bbVars = null;
+ private Accumulators accum = null;
- Query query = null;
- QueryInfo queryInfo = null;
- QuerySchema qSchema = null;
- DataSchema dSchema = null;
+ private Query query = null;
+ private QueryInfo queryInfo = null;
- boolean useLocalCache = false;
- boolean limitHitsPerSelector = false;
- int maxHitsPerSelector = 0;
+ private boolean useLocalCache = false;
+ private boolean limitHitsPerSelector = false;
+ private int maxHitsPerSelector = 0;
public EncRowCalc(Accumulators pirWLAccum, BroadcastVars pirWLBBVars)
{
accum = pirWLAccum;
- bbVars = pirWLBBVars;
- query = bbVars.getQuery();
- queryInfo = bbVars.getQueryInfo();
- qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
- dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName());
+ query = pirWLBBVars.getQuery();
+ queryInfo = pirWLBBVars.getQueryInfo();
+ QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
+ DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName());
- if (bbVars.getUseLocalCache().equals("true"))
+ if (pirWLBBVars.getUseLocalCache().equals("true"))
{
useLocalCache = true;
}
- limitHitsPerSelector = bbVars.getLimitHitsPerSelector();
- maxHitsPerSelector = bbVars.getMaxHitsPerSelector();
+ limitHitsPerSelector = pirWLBBVars.getLimitHitsPerSelector();
+ maxHitsPerSelector = pirWLBBVars.getMaxHitsPerSelector();
logger.info("Initialized EncRowCalc - limitHitsPerSelector = " + limitHitsPerSelector + " maxHitsPerSelector = " + maxHitsPerSelector);
}
@@ -86,7 +81,7 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<A
@Override
public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Iterable<ArrayList<BigInteger>>> hashDocTuple) throws Exception
{
- ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<Tuple2<Long,BigInteger>>();
+ ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
int rowIndex = hashDocTuple._1;
accum.incNumHashes(1);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
index 6f1b08b..c855aa8 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,14 +15,13 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
@@ -30,9 +29,9 @@ import org.apache.pirk.schema.data.DataSchema;
import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
-import org.apache.pirk.utils.LogUtils;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -43,39 +42,33 @@ public class EncRowCalcPrecomputedCache implements
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(EncRowCalcPrecomputedCache.class);
- Accumulators accum = null;
- BroadcastVars bbVars = null;
+ private Accumulators accum = null;
Query query = null;
- QueryInfo queryInfo = null;
- QuerySchema qSchema = null;
- DataSchema dSchema = null;
- boolean useLocalCache = false;
- boolean limitHitsPerSelector = false;
- int maxHitsPerSelector = 0;
- HashMap<Integer,BigInteger> expTable = null;
+ private boolean limitHitsPerSelector = false;
+ private int maxHitsPerSelector = 0;
+ private HashMap<Integer,BigInteger> expTable = null;
public EncRowCalcPrecomputedCache(Accumulators pirWLAccum, BroadcastVars pirWLBBVars)
{
accum = pirWLAccum;
- bbVars = pirWLBBVars;
- query = bbVars.getQuery();
- queryInfo = bbVars.getQueryInfo();
- qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
- dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName());
+ query = pirWLBBVars.getQuery();
+ QueryInfo queryInfo = pirWLBBVars.getQueryInfo();
+ QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
+ DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName());
- if (bbVars.getUseLocalCache().equals("true"))
+ if (pirWLBBVars.getUseLocalCache().equals("true"))
{
- useLocalCache = true;
+ boolean useLocalCache = true;
}
- limitHitsPerSelector = bbVars.getLimitHitsPerSelector();
- maxHitsPerSelector = bbVars.getMaxHitsPerSelector();
+ limitHitsPerSelector = pirWLBBVars.getLimitHitsPerSelector();
+ maxHitsPerSelector = pirWLBBVars.getMaxHitsPerSelector();
- expTable = new HashMap<Integer,BigInteger>();
+ expTable = new HashMap<>();
logger.info("Initialized EncRowCalcPrecomputedCache - limitHitsPerSelector = " + limitHitsPerSelector + " maxHitsPerSelector = " + maxHitsPerSelector);
}
@@ -84,7 +77,7 @@ public class EncRowCalcPrecomputedCache implements
public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<ArrayList<BigInteger>>>> hashDocTuple)
throws Exception
{
- ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<Tuple2<Long,BigInteger>>();
+ ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
int rowIndex = hashDocTuple._1;
accum.incNumHashes(1);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
index c417a9d..0642e22 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.io.BufferedWriter;
@@ -27,11 +27,10 @@ import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-import org.apache.pirk.utils.LogUtils;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -41,15 +40,13 @@ public class ExpKeyFilenameMap implements PairFlatMapFunction<Iterator<Tuple2<In
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(ExpKeyFilenameMap.class);
- BroadcastVars bbVars = null;
- String expOutDir = null;
+ private String expOutDir = null;
public ExpKeyFilenameMap(BroadcastVars bbVarsIn)
{
- bbVars = bbVarsIn;
- expOutDir = bbVars.getExpDir();
+ expOutDir = bbVarsIn.getExpDir();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
index 7ed2a1c..b071f7b 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,18 +15,17 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.math.BigInteger;
import java.util.ArrayList;
-import org.apache.log4j.Logger;
import org.apache.pirk.encryption.ModPowAbstraction;
import org.apache.pirk.query.wideskies.Query;
-import org.apache.pirk.utils.LogUtils;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -38,18 +37,16 @@ public class ExpTableGenerator implements PairFlatMapFunction<Integer,Integer,Tu
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(ExpTableGenerator.class);
- BroadcastVars bbVars = null;
Query query = null;
- BigInteger NSquared = null;
- int maxValue = 0;
+ private BigInteger NSquared = null;
+ private int maxValue = 0;
public ExpTableGenerator(BroadcastVars bbVarsIn)
{
- bbVars = bbVarsIn;
- query = bbVars.getQuery();
+ query = bbVarsIn.getQuery();
NSquared = query.getNSquared();
int dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize();
@@ -60,14 +57,14 @@ public class ExpTableGenerator implements PairFlatMapFunction<Integer,Integer,Tu
public Iterable<Tuple2<Integer,Tuple2<Integer,BigInteger>>> call(Integer queryHashKey) throws Exception
{
// queryHashKey -> <<power>,<element^power mod N^2>>
- ArrayList<Tuple2<Integer,Tuple2<Integer,BigInteger>>> modExp = new ArrayList<Tuple2<Integer,Tuple2<Integer,BigInteger>>>();
+ ArrayList<Tuple2<Integer,Tuple2<Integer,BigInteger>>> modExp = new ArrayList<>();
BigInteger element = query.getQueryElement(queryHashKey);
for (int i = 0; i <= maxValue; ++i)
{
BigInteger modPow = ModPowAbstraction.modPow(element, BigInteger.valueOf(i), NSquared);
- Tuple2<Integer,BigInteger> modPowTuple = new Tuple2<Integer,BigInteger>(i, modPow);
- modExp.add(new Tuple2<Integer,Tuple2<Integer,BigInteger>>(queryHashKey, modPowTuple));
+ Tuple2<Integer,BigInteger> modPowTuple = new Tuple2<>(i, modPow);
+ modExp.add(new Tuple2<>(queryHashKey, modPowTuple));
}
return modExp;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
index 3eb37f6..2a54a38 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,19 +15,19 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import org.apache.hadoop.io.MapWritable;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.schema.data.DataSchema;
import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.schema.query.filter.DataFilter;
-import org.apache.pirk.utils.LogUtils;
import org.apache.spark.api.java.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class to filter data as per the provided Filter (via the QuerySchema)
@@ -36,19 +36,17 @@ public class FilterData implements Function<MapWritable,Boolean>
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(FilterData.class);
- Accumulators accum = null;
- BroadcastVars bbVars = null;
- DataSchema dSchema = null;
- Object filter = null;
+ private Accumulators accum = null;
+ private DataSchema dSchema = null;
+ private Object filter = null;
public FilterData(Accumulators accumIn, BroadcastVars bbVarsIn) throws Exception
{
accum = accumIn;
- bbVars = bbVarsIn;
- QueryInfo queryInfo = bbVars.getQueryInfo();
+ QueryInfo queryInfo = bbVarsIn.getQueryInfo();
QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName());
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java
index b614d42..bbd0edd 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,23 +15,22 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.math.BigInteger;
import java.util.ArrayList;
import org.apache.hadoop.io.MapWritable;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.responder.wideskies.common.HashSelectorAndPartitionData;
import org.apache.pirk.schema.data.DataSchema;
import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
-import org.apache.pirk.utils.LogUtils;
import org.apache.spark.api.java.function.PairFunction;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -43,21 +42,15 @@ public class HashSelectorsAndPartitionData implements PairFunction<MapWritable,I
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(HashSelectorsAndPartitionData.class);
- Accumulators accum = null;
- BroadcastVars bbVars = null;
-
- QueryInfo queryInfo = null;
- QuerySchema qSchema = null;
- DataSchema dSchema = null;
+ private QueryInfo queryInfo = null;
+ private QuerySchema qSchema = null;
+ private DataSchema dSchema = null;
public HashSelectorsAndPartitionData(Accumulators pirWLAccum, BroadcastVars pirWLBBVars)
{
- accum = pirWLAccum;
- bbVars = pirWLBBVars;
-
- queryInfo = bbVars.getQueryInfo();
+ queryInfo = pirWLBBVars.getQueryInfo();
qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName());
@@ -67,7 +60,7 @@ public class HashSelectorsAndPartitionData implements PairFunction<MapWritable,I
@Override
public Tuple2<Integer,ArrayList<BigInteger>> call(MapWritable doc) throws Exception
{
- Tuple2<Integer,ArrayList<BigInteger>> returnTuple = null;
+ Tuple2<Integer,ArrayList<BigInteger>> returnTuple;
// Extract the selector, compute the hash, and partition the data element according to query type
returnTuple = HashSelectorAndPartitionData.hashSelectorAndFormPartitionsBigInteger(doc, qSchema, dSchema, queryInfo);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
index d851d70..879b618 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,28 +15,27 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.standalone;
import java.io.BufferedReader;
-import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.TreeMap;
-import org.apache.log4j.Logger;
import org.apache.pirk.encryption.ModPowAbstraction;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.query.wideskies.QueryUtils;
import org.apache.pirk.response.wideskies.Response;
import org.apache.pirk.utils.KeyedHash;
-import org.apache.pirk.utils.LogUtils;
import org.apache.pirk.utils.SystemConfiguration;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class to perform stand alone responder functionalities
@@ -51,18 +50,18 @@ import org.json.simple.parser.JSONParser;
*/
public class Responder
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(Responder.class);
- Query query = null;
- QueryInfo queryInfo = null;
+ private Query query = null;
+ private QueryInfo queryInfo = null;
- String queryType = null;
+ private String queryType = null;
- Response response = null;
+ private Response response = null;
- TreeMap<Integer,BigInteger> columns = null; // the column values for the PIR calculations
+ private TreeMap<Integer,BigInteger> columns = null; // the column values for the PIR calculations
- ArrayList<Integer> rowColumnCounters; // keeps track of how many hit partitions have been recorded for each row/selector
+ private ArrayList<Integer> rowColumnCounters; // keeps track of how many hit partitions have been recorded for each row/selector
public Responder(Query queryInput)
{
@@ -73,10 +72,10 @@ public class Responder
response = new Response(queryInfo);
// Columns are allocated as needed, initialized to 1
- columns = new TreeMap<Integer,BigInteger>();
+ columns = new TreeMap<>();
// Initialize row counters
- rowColumnCounters = new ArrayList<Integer>();
+ rowColumnCounters = new ArrayList<>();
for (int i = 0; i < Math.pow(2, queryInfo.getHashBitSize()); ++i)
{
rowColumnCounters.add(0);
@@ -178,7 +177,7 @@ public class Responder
BigInteger column = columns.get(i + rowCounter); // the next 'free' column relative to the selector
logger.debug("Before: columns.get(" + (i + rowCounter) + ") = " + columns.get(i + rowCounter));
- BigInteger exp = null;
+ BigInteger exp;
if (query.getQueryInfo().getUseExpLookupTable() && !query.getQueryInfo().getUseHDFSExpLookupTable()) // using the standalone
// lookup table
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/response/wideskies/Response.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/response/wideskies/Response.java b/src/main/java/org/apache/pirk/response/wideskies/Response.java
index 6127b06..3d2a3c0 100644
--- a/src/main/java/org/apache/pirk/response/wideskies/Response.java
+++ b/src/main/java/org/apache/pirk/response/wideskies/Response.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.response.wideskies;
import java.io.File;
@@ -30,9 +30,9 @@ import java.util.TreeMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
import org.apache.pirk.query.wideskies.QueryInfo;
-import org.apache.pirk.utils.LogUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class to hold the encrypted response elements for the PIR query
@@ -44,17 +44,17 @@ public class Response implements Serializable
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(Response.class);
- QueryInfo queryInfo = null; // holds all query info
+ private QueryInfo queryInfo = null; // holds all query info
- TreeMap<Integer,BigInteger> responseElements = null; // encrypted response columns, colNum -> column
+ private TreeMap<Integer,BigInteger> responseElements = null; // encrypted response columns, colNum -> column
public Response(QueryInfo queryInfoInput)
{
queryInfo = queryInfoInput;
- responseElements = new TreeMap<Integer,BigInteger>();
+ responseElements = new TreeMap<>();
}
public TreeMap<Integer,BigInteger> getResponseElements()
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/schema/data/DataSchema.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchema.java b/src/main/java/org/apache/pirk/schema/data/DataSchema.java
index 11c3c8e..e0512bb 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchema.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchema.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.schema.data;
import java.io.Serializable;
@@ -23,10 +23,10 @@ import java.util.HashMap;
import java.util.HashSet;
import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
import org.apache.pirk.schema.data.partitioner.DataPartitioner;
import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
-import org.apache.pirk.utils.LogUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class to hold a data schema
@@ -35,21 +35,21 @@ public class DataSchema implements Serializable
{
private static final long serialVersionUID = 1L;
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(DataSchema.class);
- String schemaName = null;
+ private String schemaName = null;
- String primitiveTypePartitionerName = null;
+ private String primitiveTypePartitionerName = null;
- transient HashMap<String,Text> textRep = null; // string element name -> Text representation
+ private transient HashMap<String,Text> textRep = null; // string element name -> Text representation
- transient HashMap<String,Object> partitionerInstances = null; // partitioner class name -> Text representation
+ private transient HashMap<String,Object> partitionerInstances = null; // partitioner class name -> Text representation
- HashMap<String,String> typeMap = null; // string element name -> java type
+ private HashMap<String,String> typeMap = null; // string element name -> java type
- HashMap<String,String> partitionerMap = null; // string element name -> partitioner class name
+ private HashMap<String,String> partitionerMap = null; // string element name -> partitioner class name
- HashSet<String> listRep = null; // elements that are list/array types
+ private HashSet<String> listRep = null; // elements that are list/array types
public DataSchema(String schemaNameInput, HashMap<String,Text> textRepInput, HashSet<String> listRepInput, HashMap<String,String> typeMapInput,
HashMap<String,String> partitionerMapInput)
@@ -78,7 +78,7 @@ public class DataSchema implements Serializable
private void constructTextRep()
{
- textRep = new HashMap<String,Text>();
+ textRep = new HashMap<>();
for (String name : typeMap.keySet())
{
textRep.put(name, new Text(name));
@@ -101,7 +101,7 @@ public class DataSchema implements Serializable
private void constructPartitionerInstances() throws Exception
{
- partitionerInstances = new HashMap<String,Object>();
+ partitionerInstances = new HashMap<>();
for (String partitionerName : partitionerMap.values())
{
if (!partitionerInstances.containsKey(partitionerName))
@@ -202,7 +202,7 @@ public class DataSchema implements Serializable
public HashSet<String> getNonListRep()
{
- HashSet<String> elements = new HashSet<String>();
+ HashSet<String> elements = new HashSet<>();
elements.addAll(textRep.keySet());
elements.removeAll(listRep);
return elements;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/ef8d1c1a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java b/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java
index 5bbe754..73995e8 100644
--- a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java
+++ b/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.schema.data;
import java.io.File;
@@ -30,11 +30,11 @@ import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
import org.apache.pirk.schema.data.partitioner.DataPartitioner;
import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
-import org.apache.pirk.utils.LogUtils;
import org.apache.pirk.utils.SystemConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -64,11 +64,11 @@ import org.w3c.dom.NodeList;
*/
public class LoadDataSchemas
{
- private static Logger logger = LogUtils.getLoggerForThisClass();
+ private static final Logger logger = LoggerFactory.getLogger(LoadDataSchemas.class);
- public static HashMap<String,DataSchema> schemaMap;
+ private static HashMap<String,DataSchema> schemaMap;
- public static HashSet<String> allowedPrimitiveJavaTypes = new HashSet<String>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT,
+ private static HashSet<String> allowedPrimitiveJavaTypes = new HashSet<>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT,
PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE,
PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING));
@@ -76,7 +76,7 @@ public class LoadDataSchemas
{
logger.info("Loading data schemas: ");
- schemaMap = new HashMap<String,DataSchema>();
+ schemaMap = new HashMap<>();
try
{
initialize();
@@ -126,21 +126,21 @@ public class LoadDataSchemas
private static DataSchema loadDataSchemaFile(String schemaFile, boolean hdfs, FileSystem fs) throws Exception
{
- DataSchema dataSchema = null;
+ DataSchema dataSchema;
// Initialize the elements needed to create the DataSchema
- String schemaName = null;
- HashMap<String,Text> textRep = new HashMap<String,Text>();
- HashSet<String> listRep = new HashSet<String>();
- HashMap<String,String> typeMap = new HashMap<String,String>();
- HashMap<String,String> partitionerMap = new HashMap<String,String>();
- HashMap<String,Object> partitionerInstances = new HashMap<String,Object>();
+ String schemaName;
+ HashMap<String,Text> textRep = new HashMap<>();
+ HashSet<String> listRep = new HashSet<>();
+ HashMap<String,String> typeMap = new HashMap<>();
+ HashMap<String,String> partitionerMap = new HashMap<>();
+ HashMap<String,Object> partitionerInstances = new HashMap<>();
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
// Read in and parse the schema file
- Document doc = null;
+ Document doc;
if (hdfs)
{
Path filePath = new Path(schemaFile);