You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pirk.apache.org by ea...@apache.org on 2016/09/25 00:53:57 UTC
[02/10] incubator-pirk git commit: [PIRK-68] - Add the Capability for
Generic Fields in the Query Schema,
updated javadocs -- closes apache/incubator-pirk#101
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java
index 024a89d..a8f26cb 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java
@@ -163,20 +163,22 @@ public class QuerierCLI
options.addOption(optionACTION);
// INPUTFILE
- Option optionINPUTFILE = new Option("i", QuerierProps.INPUTFILE, true, "required - Fully qualified file containing input "
- + "-- \n The input is either: \n (1) For Encryption: A query file - Contains the query selectors, one per line; "
- + "the first line must be the query number \n OR \n (2) For Decryption: A response file - Contains the serialized Response object");
+ Option optionINPUTFILE = new Option("i", QuerierProps.INPUTFILE, true,
+ "required - Fully qualified file containing input "
+ + "-- \n The input is either: \n (1) For Encryption: A query file - Contains the query selectors, one per line; "
+ + "the first line must be the query number \n OR \n (2) For Decryption: A response file - Contains the serialized Response object");
optionINPUTFILE.setRequired(false);
optionINPUTFILE.setArgName(QuerierProps.INPUTFILE);
optionINPUTFILE.setType(String.class);
options.addOption(optionINPUTFILE);
// OUTPUTFILE
- Option optionOUTPUTFILE = new Option("o", QuerierProps.OUTPUTFILE, true, "required - Fully qualified file for the result output. "
- + "\n The output file specifies either: \n (1) For encryption: \n \t (a) A file to contain the serialized Querier object named: " + "<outputFile>-"
- + QuerierConst.QUERIER_FILETAG + " AND \n \t " + "(b) A file to contain the serialized Query object named: <outputFile>-" + QuerierConst.QUERY_FILETAG
- + "\n " + "OR \n (2) A file to contain the decryption results where each line is where each line "
- + "corresponds to one hit and is a JSON object with the schema QuerySchema");
+ Option optionOUTPUTFILE = new Option("o", QuerierProps.OUTPUTFILE, true,
+ "required - Fully qualified file for the result output. "
+ + "\n The output file specifies either: \n (1) For encryption: \n \t (a) A file to contain the serialized Querier object named: " + "<outputFile>-"
+ + QuerierConst.QUERIER_FILETAG + " AND \n \t " + "(b) A file to contain the serialized Query object named: <outputFile>-"
+ + QuerierConst.QUERY_FILETAG + "\n " + "OR \n (2) A file to contain the decryption results where each line is where each line "
+ + "corresponds to one hit and is a JSON object with the schema QuerySchema");
optionOUTPUTFILE.setRequired(false);
optionOUTPUTFILE.setArgName(QuerierProps.OUTPUTFILE);
optionOUTPUTFILE.setType(String.class);
@@ -204,8 +206,8 @@ public class QuerierCLI
options.addOption(optionQuerySchemas);
// TYPE
- Option optionTYPE = new Option("qt", QuerierProps.QUERYTYPE, true, "required for encryption -- Type of the query as defined "
- + "in the 'schemaName' tag of the corresponding query schema file");
+ Option optionTYPE = new Option("qt", QuerierProps.QUERYTYPE, true,
+ "required for encryption -- Type of the query as defined " + "in the 'schemaName' tag of the corresponding query schema file");
optionTYPE.setRequired(false);
optionTYPE.setArgName(QuerierProps.QUERYTYPE);
optionTYPE.setType(String.class);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
index c1a4e4a..0338281 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
@@ -153,9 +153,9 @@ public class QuerierDriver implements Serializable
// Perform the action
if (action.equals("encrypt"))
{
- logger.info("Performing encryption: \n inputFile = " + inputFile + "\n outputFile = " + outputFile + "\n numThreads = " + numThreads
- + "\n hashBitSize = " + hashBitSize + "\n hashKey = " + hashKey + "\n dataPartitionBitSize = " + dataPartitionBitSize + "\n paillierBitSize = "
- + paillierBitSize + "\n certainty = " + certainty);
+ logger.info("Performing encryption: \n inputFile = " + inputFile + "\n outputFile = " + outputFile + "\n numThreads = " + numThreads + "\n hashBitSize = "
+ + hashBitSize + "\n hashKey = " + hashKey + "\n dataPartitionBitSize = " + dataPartitionBitSize + "\n paillierBitSize = " + paillierBitSize
+ + "\n certainty = " + certainty);
// Read in the selectors and extract the queryIdentifier - first line in the file
ArrayList<String> selectors = FileIOUtils.readToArrayList(inputFile);
@@ -182,8 +182,8 @@ public class QuerierDriver implements Serializable
BigInteger val = (BigInteger.valueOf(2)).pow(exp);
if (val.compareTo(paillier.getN()) != -1)
{
- logger.error("The number of selectors = " + numSelectors + " must be such that " + "2^{numSelector*dataPartitionBitSize} < N = "
- + paillier.getN().toString(2));
+ logger.error(
+ "The number of selectors = " + numSelectors + " must be such that " + "2^{numSelector*dataPartitionBitSize} < N = " + paillier.getN().toString(2));
System.exit(0);
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
index 1482137..06bfa28 100644
--- a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
+++ b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
@@ -72,8 +72,8 @@ public class QueryInfo implements Serializable, Cloneable
embedSelectorInput, useHDFSExpLookupTableInput);
}
- public QueryInfo(UUID identifierInput, int numSelectorsInput, int hashBitSizeInput, String hashKeyInput, int dataPartitionBitSizeInput,
- String queryTypeInput, boolean useExpLookupTableInput, boolean embedSelectorInput, boolean useHDFSExpLookupTableInput)
+ public QueryInfo(UUID identifierInput, int numSelectorsInput, int hashBitSizeInput, String hashKeyInput, int dataPartitionBitSizeInput, String queryTypeInput,
+ boolean useExpLookupTableInput, boolean embedSelectorInput, boolean useHDFSExpLookupTableInput)
{
identifier = identifierInput;
queryType = queryTypeInput;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
index 514491a..e08f2da 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
@@ -111,7 +111,7 @@ public class ResponderCLI
* Method to parse and validate the options provided
*
* @return - true if valid, false otherwise
- * @throws IOException
+ * @throws IOException
*/
private boolean parseOptions() throws IOException
{
@@ -122,15 +122,15 @@ public class ResponderCLI
{
SystemConfiguration.loadPropsFromFile(new File(getOptionValue(LOCALPROPFILE)));
}
- else if(hasOption(HDFSPROPDIR))
+ else if (hasOption(HDFSPROPDIR))
{
- FileSystem fs = FileSystem.get(new Configuration());
- SystemConfiguration.loadPropsFromHDFSDir(getOptionValue(HDFSPROPDIR), fs);
+ FileSystem fs = FileSystem.get(new Configuration());
+ SystemConfiguration.loadPropsFromHDFSDir(getOptionValue(HDFSPROPDIR), fs);
}
- else if(hasOption(HDFSPROPFILE))
+ else if (hasOption(HDFSPROPFILE))
{
- FileSystem fs = FileSystem.get(new Configuration());
- SystemConfiguration.loadPropsFromFile(getOptionValue(HDFSPROPFILE), fs);
+ FileSystem fs = FileSystem.get(new Configuration());
+ SystemConfiguration.loadPropsFromFile(getOptionValue(HDFSPROPFILE), fs);
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index 02dbf2e..0e1561c 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -57,12 +57,10 @@ public class ResponderDriver
{
responder.run();
}
- }
- catch (PIRException pirEx)
+ } catch (PIRException pirEx)
{
logger.error("Failed to load platform plugin: {}! {}", platformName, pirEx.getMessage());
- }
- catch (Exception ex)
+ } catch (Exception ex)
{
logger.error("Failed to run platform plugin: {}! {}", platformName, ex);
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index a9eb80d..64d0e6a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -106,15 +106,16 @@ public class ResponderProps
private static final String STORMSALTCOLUMNS = "storm.saltColumns";
private static final String STORMNUMROWDIVS = "storm.rowDivs";
- private static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS,
- STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, STORMMAXWORKERHEAP,
- STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY,
+ private static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME,
+ STORMWORKERS, STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS,
+ STORMMAXWORKERHEAP, STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY,
STORMSPLITPARTITIONS, STORMSALTCOLUMNS, STORMNUMROWDIVS};
- static final List<String> PROPSLIST = Arrays.asList((String[]) ArrayUtils.addAll(new String[] {PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY,
- ESRESOURCE, ESQUERY, ESNODES, OUTPUTFILE, BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY,
- REDUCEMEMORY, MAPJAVAOPTS, REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS,
- USEMODEXPJOIN, COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY}, STORMPROPS));
+ static final List<String> PROPSLIST = Arrays
+ .asList((String[]) ArrayUtils.addAll(new String[] {PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, ESNODES, OUTPUTFILE,
+ BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS,
+ REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN,
+ COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY}, STORMPROPS));
/**
* Validates the responder properties
@@ -169,7 +170,11 @@ public class ResponderProps
if (!SystemConfiguration.hasProperty(BASEQUERY))
{
- SystemConfiguration.setProperty("BASEQUERY", "?q=*");
+ SystemConfiguration.setProperty(BASEQUERY, "?q=*");
+ }
+ else if (!SystemConfiguration.getProperty(BASEQUERY).startsWith("?q"))
+ {
+ SystemConfiguration.setProperty(BASEQUERY, "?q=*");
}
}
else if (dataInputFormat.equals(InputFormatConst.ES))
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
index 129b1c9..677c1cb 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
@@ -53,7 +53,7 @@ public class ResponderService
{
try
{
- for(ResponderPlugin plugin : loader)
+ for (ResponderPlugin plugin : loader)
{
if (platformName.equalsIgnoreCase(plugin.getPlatformName()))
{
@@ -61,8 +61,7 @@ public class ResponderService
return plugin;
}
}
- }
- catch (ServiceConfigurationError e)
+ } catch (ServiceConfigurationError e)
{
logger.error("ResponderPlugin configuration error {}", e);
throw new PIRException(e);
@@ -70,4 +69,4 @@ public class ResponderService
return null;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
index fcbc88b..c5844f0 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
@@ -32,7 +32,8 @@ public class MapReduceResponder implements ResponderPlugin
private static final Logger logger = LoggerFactory.getLogger(MapReduceResponder.class);
@Override
- public String getPlatformName() {
+ public String getPlatformName()
+ {
return "mapreduce";
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index 00bc5c1..add79a4 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -140,8 +140,8 @@ public class ComputeResponse
String stopListFile = SystemConfiguration.getProperty("pir.stopListFile");
useModExpJoin = SystemConfiguration.getBooleanProperty("pir.useModExpJoin", false);
- logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery
- + " esResource = " + esResource);
+ logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery + " esResource = "
+ + esResource);
// Set the necessary configurations
SparkConf conf = new SparkConf().setAppName("SparkPIR").setMaster("yarn-cluster");
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
index 20f02ad..f5a591e 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
@@ -93,8 +93,8 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<L
}
// Compute the encrypted row elements for a query from extracted data partitions
- List<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRowBI(hashDocTuple._2, query, rowIndex, limitHitsPerSelector,
- maxHitsPerSelector, useLocalCache);
+ List<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRowBI(hashDocTuple._2, query, rowIndex, limitHitsPerSelector, maxHitsPerSelector,
+ useLocalCache);
returnPairs.addAll(encRowValues);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
index 038287b..8147ff6 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
@@ -34,8 +34,8 @@ import scala.Tuple2;
/**
* Functionality for computing the encrypted rows using a pre-computed, passed in modular exponentiation lookup table
*/
-public class EncRowCalcPrecomputedCache implements
- PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<List<BigInteger>>>>,Long,BigInteger>
+public class EncRowCalcPrecomputedCache
+ implements PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<List<BigInteger>>>>,Long,BigInteger>
{
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
index fce905d..c2ed6ec 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
@@ -35,7 +35,8 @@ public class SparkResponder implements ResponderPlugin
private static final Logger logger = LoggerFactory.getLogger(SparkResponder.class);
@Override
- public String getPlatformName() {
+ public String getPlatformName()
+ {
return "spark";
}
@@ -47,8 +48,7 @@ public class SparkResponder implements ResponderPlugin
{
ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration()));
computeResponse.performQuery();
- }
- catch (IOException e)
+ } catch (IOException e)
{
logger.error("Unable to open filesystem: {}", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
index c291df0..492bfea 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
@@ -148,8 +148,8 @@ public class ComputeStreamingResponse
queryInput = SystemConfiguration.getProperty("pir.queryInput");
String stopListFile = SystemConfiguration.getProperty("pir.stopListFile");
- logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery
- + " esResource = " + esResource);
+ logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery + " esResource = "
+ + esResource);
// Pull the batchSeconds and windowLength parameters
long batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30);
@@ -280,7 +280,8 @@ public class ComputeStreamingResponse
{
inputRDD = readDataES();
}
- else {
+ else
+ {
throw new PIRException("Unknown data input format " + dataInputFormat);
}
@@ -458,7 +459,7 @@ public class ComputeStreamingResponse
bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue());
// Form and write the response object
- encColRDD.repartition(1).foreachRDD((VoidFunction<JavaPairRDD<Long, BigInteger>>) rdd -> {
+ encColRDD.repartition(1).foreachRDD((VoidFunction<JavaPairRDD<Long,BigInteger>>) rdd -> {
rdd.foreachPartition(new FinalResponseFunction(accum, bVars));
int maxBatchesVar = bVars.getMaxBatches();
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
index 4ce0571..3178fef 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
@@ -36,7 +36,8 @@ public class SparkStreamingResponder implements ResponderPlugin
private static final Logger logger = LoggerFactory.getLogger(SparkStreamingResponder.class);
@Override
- public String getPlatformName() {
+ public String getPlatformName()
+ {
return "sparkstreaming";
}
@@ -45,7 +46,7 @@ public class SparkStreamingResponder implements ResponderPlugin
{
// For handling System.exit calls from Spark Streaming
System.setSecurityManager(new SystemExitManager());
-
+
FileSystem fileSys;
try
{
@@ -54,15 +55,14 @@ public class SparkStreamingResponder implements ResponderPlugin
{
throw new PIRException(e);
}
-
+
logger.info("Launching Spark ComputeStreamingResponse:");
ComputeStreamingResponse computeSR = null;
try
{
computeSR = new ComputeStreamingResponse(fileSys);
computeSR.performQuery();
- }
- catch (SystemExitException e)
+ } catch (SystemExitException e)
{
// If System.exit(0) is not caught from Spark Streaming,
// the application will complete with a 'failed' status
@@ -70,8 +70,7 @@ public class SparkStreamingResponder implements ResponderPlugin
} catch (IOException e)
{
throw new PIRException(e);
- }
- finally
+ } finally
{
// Teardown the context
if (computeSR != null)
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
index 912850a..26b24bd 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
@@ -30,11 +30,12 @@ import org.apache.pirk.utils.PIRException;
public interface ResponderPlugin
{
/**
- * Returns the plugin name for your framework
- * This will be the platform argument
+ * Returns the plugin name for your framework This will be the platform argument
+ *
* @return
*/
public String getPlatformName();
+
/**
* This method launches your framework responder.
*/
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
index 75cd292..cbfd182 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
@@ -205,8 +205,8 @@ public class Responder
columns.put(i + rowCounter, column);
- logger.debug("exp = " + exp + " i = " + i + " partition = " + hitValPartitions.get(i) + " = " + hitValPartitions.get(i).toString(2) + " column = "
- + column);
+ logger.debug(
+ "exp = " + exp + " i = " + i + " partition = " + hitValPartitions.get(i) + " = " + hitValPartitions.get(i).toString(2) + " column = " + column);
logger.debug("After: columns.get(" + (i + rowCounter) + ") = " + columns.get(i + rowCounter));
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
index 5214c5f..db33b1a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
@@ -35,7 +35,8 @@ public class StandaloneResponder implements ResponderPlugin
private static final Logger logger = LoggerFactory.getLogger(StandaloneResponder.class);
@Override
- public String getPlatformName() {
+ public String getPlatformName()
+ {
return "standalone";
}
@@ -49,8 +50,7 @@ public class StandaloneResponder implements ResponderPlugin
Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
Responder pirResponder = new Responder(query);
pirResponder.computeStandaloneResponse();
- }
- catch (IOException e)
+ } catch (IOException e)
{
logger.error("Error reading {}, {}", queryInput, e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
index 90375aa..77edc24 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
@@ -121,8 +121,8 @@ public class EncColMultBolt extends BaseRichBolt
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
{
- outputFieldsDeclarer
- .declareStream(StormConstants.ENCCOLMULTBOLT_ID, new Fields(StormConstants.COLUMN_INDEX_ECM_FIELD, StormConstants.COLUMN_PRODUCT_FIELD));
+ outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_ID,
+ new Fields(StormConstants.COLUMN_INDEX_ECM_FIELD, StormConstants.COLUMN_PRODUCT_FIELD));
outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Fields("finished"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
index 8a5b854..3c8fe1a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
@@ -157,8 +157,8 @@ public class EncRowCalcBolt extends BaseRichBolt
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
{
- outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD,
- StormConstants.ENCRYPTED_VALUE_FIELD, StormConstants.SALT));
+ outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
+ new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.ENCRYPTED_VALUE_FIELD, StormConstants.SALT));
outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Fields(StormConstants.FLUSH));
}
@@ -166,7 +166,8 @@ public class EncRowCalcBolt extends BaseRichBolt
* Extracts (hash, data partitions) from tuple. Encrypts the data partitions. Returns all of the pairs of (col index, col value). Also advances the
* colIndexByRow and hitsByRow appropriately.
*
- * @param tuple {@code Tuple}
+ * @param tuple
+ * {@code Tuple}
* @return {@code List<Tuple2>}
*/
private List<Tuple2<Long,BigInteger>> processTupleFromPartitionDataBolt(Tuple tuple)
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
index fd23c7e..a479883 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
@@ -133,8 +133,7 @@ public class PirkTopology
// b2.setMemoryLoad(5000);
// b2.setCPULoad(150.0);
- BoltDeclarer b3 = builder
- .setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism)
+ BoltDeclarer b3 = builder.setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism)
.fieldsGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.SALT))
.allGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_FLUSH_SIG);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
index b988ccc..1f79993 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
@@ -31,6 +31,7 @@ public class StormResponder implements ResponderPlugin
{
private static final Logger logger = LoggerFactory.getLogger(StormResponder.class);
+
@Override
public String getPlatformName()
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
index bbffaba..b96e71f 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
@@ -43,9 +43,12 @@ public class StormUtils
/**
* Method to read in serialized Query object from the given queryFile
*
- * @param useHdfs - true or false
- * @param hdfsUri - HDFS path
- * @param queryFile -
+ * @param useHdfs
+ * - true or false
+ * @param hdfsUri
+ * - HDFS path
+ * @param queryFile
+ * -
* @return {@link Query}
*/
public static Query getQuery(boolean useHdfs, String hdfsUri, String queryFile)
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index d1a4797..dfa33f9 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -71,9 +71,9 @@ public class DataSchemaLoader
{
private static final Logger logger = LoggerFactory.getLogger(DataSchemaLoader.class);
- private static Set<String> allowedPrimitiveJavaTypes = new HashSet<>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT,
- PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE,
- PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING));
+ private static Set<String> allowedPrimitiveJavaTypes = new HashSet<>(
+ Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG,
+ PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE, PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING));
static
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/schema/query/QuerySchema.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchema.java b/src/main/java/org/apache/pirk/schema/query/QuerySchema.java
index cdc223e..6fa4dd5 100644
--- a/src/main/java/org/apache/pirk/schema/query/QuerySchema.java
+++ b/src/main/java/org/apache/pirk/schema/query/QuerySchema.java
@@ -20,6 +20,7 @@ package org.apache.pirk.schema.query;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -59,6 +60,9 @@ public class QuerySchema implements Serializable
// Total number of bits to be returned for each data element hit.
private final int dataElementSize;
+ // Additional fields by key,value
+ private final HashMap<String,String> additionalFields = new HashMap<>();
+
QuerySchema(String schemaName, String dataSchemaName, String selectorName, String filterTypeName, DataFilter filter, int dataElementSize)
{
this.schemaName = schemaName;
@@ -153,4 +157,27 @@ public class QuerySchema implements Serializable
{
return filter;
}
+
+ /**
+ * Returns the map of additional field keys and values
+ * <p>
+ * Note that additional fields are optional, thus the map may be empty
+ *
+ * @return The additionalFields HashMap
+ */
+ public HashMap<String,String> getAdditionalFields()
+ {
+ return additionalFields;
+ }
+
+ /**
+ * Returns the value from the additionalFields mapping corresponding to the given key
+ *
+ * @param key
+ * @return value from the additionalFields mapping corresponding to the given key
+ */
+ public String getAdditionalFieldValue(String key)
+ {
+ return additionalFields.get(key);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
index c1b4139..949396b 100644
--- a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
@@ -21,6 +21,7 @@ package org.apache.pirk.schema.query;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
@@ -65,6 +66,12 @@ import org.xml.sax.SAXException;
* <filterNames> (optional)
* <name> element name of element in the data schema to apply pre-processing filters </name>
* </filterNames>
+ * <additional> (optional) additional fields for the query schema, in <key,value> pairs
+ * <field>
+ * <key> key corresponding the the field </key>
+ * <value> value corresponding to the field </value>
+ * </field>
+ * </additional>
* </schema>
* }
* </pre>
@@ -118,6 +125,7 @@ public class QuerySchemaLoader
String querySchemas = SystemConfiguration.getProperty("query.schemas", "none");
if (querySchemas.equals("none"))
{
+ logger.info("query.schemas = none");
return;
}
@@ -260,10 +268,29 @@ public class QuerySchemaLoader
Set<String> filteredNamesSet = extractFilteredElementNames(doc);
DataFilter filter = instantiateFilter(filterTypeName, filteredNamesSet);
+ // Extract the additional fields, if they exists
+ HashMap<String,String> additionalFields = new HashMap<String,String>();
+ if (doc.getElementsByTagName("additional").item(0) != null)
+ {
+ NodeList fieldList = doc.getElementsByTagName("field");
+ int numFields = fieldList.getLength();
+ if (numFields == 0)
+ {
+ throw new PIRException("numFields = " + numFields + " -- should be at least one");
+ }
+ for (int i = 0; i < numFields; ++i)
+ {
+ Element fields = (Element) fieldList.item(i);
+ NodeList kv = fields.getChildNodes();
+ additionalFields.put(getNodeValue("key", kv), getNodeValue("value", kv));
+ }
+ }
+
// Create and return the query schema object.
QuerySchema querySchema = new QuerySchema(schemaName, dataSchemaName, selectorName, filterTypeName, filter, dataElementSize);
querySchema.getElementNames().addAll(elementNames);
querySchema.getFilteredElementNames().addAll(filteredNamesSet);
+ querySchema.getAdditionalFields().putAll(additionalFields);
return querySchema;
}
@@ -358,6 +385,30 @@ public class QuerySchemaLoader
}
/**
+ * Extracts the value corresponding to a given tag from the XML nodeList
+ *
+ * @param tagName
+ * The name of the tag for which to extract the value
+ * @param nodes
+ * The NodeList
+ * @return The given value
+ */
+ private String getNodeValue(String tagName, NodeList nodes)
+ {
+ String value = "";
+
+ for (int x = 0; x < nodes.getLength(); x++)
+ {
+ Node node = nodes.item(x);
+ if (node.getNodeName().equals(tagName))
+ {
+ value = node.getChildNodes().item(0).getNodeValue().trim();
+ }
+ }
+ return value;
+ }
+
+ /**
* Instantiate the specified filter.
*
* Exceptions derive from call to the {@code getFilter} method of {@link FilterFactory}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/test/utils/BaseTests.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
index 7536d8f..bb2ed66 100644
--- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java
+++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
@@ -51,7 +51,8 @@ public class BaseTests
public static final int dataPartitionBitSize = 8;
// Selectors for domain and IP queries, queryIdentifier is the first entry for file generation
- private static ArrayList<String> selectorsDomain = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net"));
+ private static ArrayList<String> selectorsDomain = new ArrayList<>(
+ Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net"));
private static ArrayList<String> selectorsIP = new ArrayList<>(Arrays.asList("55.55.55.55", "5.6.7.8", "10.20.30.40", "13.14.15.16", "21.22.23.24"));
// Encryption variables -- Paillier mechanisms are tested in the Paillier test code, so these are fixed...
@@ -65,8 +66,7 @@ public class BaseTests
testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive, false);
}
- public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads)
- throws Exception
+ public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception
{
testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false, false);
}
@@ -276,8 +276,7 @@ public class BaseTests
}
// A query that returned an nxdomain response was made for the watched hostname; watched value type: hostname (String)
- public static void testDNSNXDOMAINQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads)
- throws Exception
+ public static void testDNSNXDOMAINQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception
{
logger.info("Running testDNSNXDOMAINQuery(): ");
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/test/utils/Inputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/Inputs.java b/src/main/java/org/apache/pirk/test/utils/Inputs.java
index b6e7251..e107c56 100644
--- a/src/main/java/org/apache/pirk/test/utils/Inputs.java
+++ b/src/main/java/org/apache/pirk/test/utils/Inputs.java
@@ -460,16 +460,16 @@ public class Inputs
TestUtils.createQuerySchema(DNS_IP_QUERY_FILE, DNS_IP_QUERY, TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, filter);
if (hdfs)
{
- TestUtils.createQuerySchema(DNS_IP_QUERY_FILE_HDFS, DNS_IP_QUERY, TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, filter,
- false, fs, hdfs);
+ TestUtils.createQuerySchema(DNS_IP_QUERY_FILE_HDFS, DNS_IP_QUERY, TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, filter, false,
+ fs, hdfs);
}
// DNS_NXDOMAIN_QUERY
List<String> dnsNXQueryElements = Arrays.asList(QNAME, SRCIP, DSTIP);
List<String> dnsNXQueryFilterElements = Collections.singletonList(QNAME);
- TestUtils
- .createQuerySchema(DNS_NXDOMAIN_QUERY_FILE, DNS_NXDOMAIN_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, dnsNXQueryFilterElements, filter);
+ TestUtils.createQuerySchema(DNS_NXDOMAIN_QUERY_FILE, DNS_NXDOMAIN_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, dnsNXQueryFilterElements,
+ filter);
if (hdfs)
{
TestUtils.createQuerySchema(DNS_NXDOMAIN_QUERY_FILE_HDFS, DNS_NXDOMAIN_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, dnsNXQueryFilterElements,
@@ -480,8 +480,8 @@ public class Inputs
List<String> dnsSrcIPQueryElements = Arrays.asList(QNAME, DSTIP, IPS);
List<String> dnsSrcIPQueryFilterElements = Arrays.asList(SRCIP, IPS);
- TestUtils
- .createQuerySchema(DNS_SRCIP_QUERY_FILE, DNS_SRCIP_QUERY, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, dnsSrcIPQueryFilterElements, filter);
+ TestUtils.createQuerySchema(DNS_SRCIP_QUERY_FILE, DNS_SRCIP_QUERY, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, dnsSrcIPQueryFilterElements,
+ filter);
if (hdfs)
{
TestUtils.createQuerySchema(DNS_SRCIP_QUERY_FILE_HDFS, DNS_SRCIP_QUERY, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, dnsSrcIPQueryFilterElements,
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/test/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/TestUtils.java b/src/main/java/org/apache/pirk/test/utils/TestUtils.java
index 05c9f28..9f62250 100644
--- a/src/main/java/org/apache/pirk/test/utils/TestUtils.java
+++ b/src/main/java/org/apache/pirk/test/utils/TestUtils.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import javax.xml.parsers.DocumentBuilder;
@@ -131,7 +132,7 @@ public class TestUtils
public static void createQuerySchema(String schemaFile, String querySchemaName, String dataSchemaNameInput, String selectorNameInput,
List<String> elementNames, List<String> filterNames, String filter) throws IOException
{
- createQuerySchema(schemaFile, querySchemaName, dataSchemaNameInput, selectorNameInput, elementNames, filterNames, filter, true, null, false);
+ createQuerySchema(schemaFile, querySchemaName, dataSchemaNameInput, selectorNameInput, elementNames, filterNames, filter, true, null, false, null);
}
/**
@@ -140,6 +141,16 @@ public class TestUtils
public static void createQuerySchema(String schemaFile, String querySchemaName, String dataSchemaNameInput, String selectorNameInput,
List<String> elementNames, List<String> filterNames, String filter, boolean append, FileSystem fs, boolean hdfs) throws IOException
{
+ createQuerySchema(schemaFile, querySchemaName, dataSchemaNameInput, selectorNameInput, elementNames, filterNames, filter, append, fs, hdfs, null);
+ }
+
+ /**
+ * Creates the test query schema file
+ */
+ public static void createQuerySchema(String schemaFile, String querySchemaName, String dataSchemaNameInput, String selectorNameInput,
+ List<String> elementNames, List<String> filterNames, String filter, boolean append, FileSystem fs, boolean hdfs, HashMap<String,String> additionalFields)
+ throws IOException
+ {
logger.info("createQuerySchema: querySchemaName = " + querySchemaName);
// Create a temporary file for the test schema, set in the properties
@@ -176,6 +187,7 @@ public class TestUtils
SystemConfiguration.setProperty("query.schemas", SystemConfiguration.getProperty("query.schemas", "") + "," + fileName);
}
}
+
logger.info("query.schemas = " + SystemConfiguration.getProperty("query.schemas"));
// Write to the file
@@ -234,6 +246,30 @@ public class TestUtils
}
}
+ // Add the additionalFields
+ if (additionalFields != null)
+ {
+ Element additionalElement = doc.createElement("additional");
+ rootElement.appendChild(additionalElement);
+
+ // Add the key,value pairs
+ for (String key : additionalFields.keySet())
+ {
+ logger.info("Creating field element with key = " + key + " and value = " + additionalFields.get(key));
+
+ Element fieldElement = doc.createElement("field");
+ additionalElement.appendChild(fieldElement);
+
+ Element keyElement = doc.createElement("key");
+ keyElement.appendChild(doc.createTextNode(key));
+ fieldElement.appendChild(keyElement);
+
+ Element valueElement = doc.createElement("value");
+ valueElement.appendChild(doc.createTextNode(additionalFields.get(key)));
+ fieldElement.appendChild(valueElement);
+ }
+ }
+
// Write to a xml file
TransformerFactory transformerFactory = TransformerFactory.newInstance();
Transformer transformer = transformerFactory.newTransformer();
@@ -268,7 +304,8 @@ public class TestUtils
/**
* Converts the result file into an ArrayList of QueryResponseJSON objects
*
- * @throws IOException - {@link IOException}
+ * @throws IOException
+ * - {@link IOException}
*/
public static List<QueryResponseJSON> readResultsFile(File file) throws IOException
{
@@ -288,7 +325,9 @@ public class TestUtils
/**
* Write the ArrayList<String to a tmp file in the local filesystem with the given fileName
- * @throws IOException - {@link IOException}
+ *
+ * @throws IOException
+ * - {@link IOException}
*/
public static String writeToTmpFile(List<String> list, String fileName, String suffix) throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/utils/QueryParserUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/utils/QueryParserUtils.java b/src/main/java/org/apache/pirk/utils/QueryParserUtils.java
index 617e6dc..f8df2c7 100644
--- a/src/main/java/org/apache/pirk/utils/QueryParserUtils.java
+++ b/src/main/java/org/apache/pirk/utils/QueryParserUtils.java
@@ -719,8 +719,8 @@ public class QueryParserUtils
}
else
{
- if (!((Integer.parseInt(blocksLower[ipBlock]) <= Integer.parseInt(ipValue[ipBlock])) && (Integer.parseInt(ipValue[ipBlock]) <= Integer
- .parseInt(blocksUpper[ipBlock]))))
+ if (!((Integer.parseInt(blocksLower[ipBlock]) <= Integer.parseInt(ipValue[ipBlock]))
+ && (Integer.parseInt(ipValue[ipBlock]) <= Integer.parseInt(blocksUpper[ipBlock]))))
{
logger.info("IP block not within given range");
matches = false;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
index 4f2fa03..a5c27a9 100755
--- a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
+++ b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
@@ -45,352 +45,353 @@ import org.slf4j.LoggerFactory;
*/
public class SystemConfiguration
{
- private static final Logger logger = LoggerFactory.getLogger(SystemConfiguration.class);
-
- private static final Properties props = new Properties();
-
- /**
- * By default, these files should be found on the root of the classpath
- */
- private static final String DEFAULT_PROPERTY_FILE = "pirk.properties";
- private static final String QUERIER_PROPERTIES_FILE = "querier.properties";
- private static final String RESPONDER_PROPERTIES_FILE = "responder.properties";
-
- private static final String LOCAL_PROPERTIES_DIR = "local.pirk.properties.dir";
-
- static
- {
- initialize();
- }
-
- public static void initialize()
- {
- props.clear();
-
- // First try to load the default properties file
- loadPropsFromResource(DEFAULT_PROPERTY_FILE);
-
- // Try to load props from the querier and responder property files, if they exist
- loadPropsFromResource(QUERIER_PROPERTIES_FILE);
- loadPropsFromResource(RESPONDER_PROPERTIES_FILE);
-
- // Try to load the local properties files, if they exists
- loadPropsFromDir(getProperty(LOCAL_PROPERTIES_DIR));
- }
-
- /**
- * Gets the specified property; returns <code>null</code> if the property isn't found.
- *
- * @param propertyName
- * The name of the requested property.
- * @return The value of the property, or <code>null</code> if the property cannot be found.
- */
- public static String getProperty(String propertyName)
- {
- return props.getProperty(propertyName);
- }
-
- /**
- * Gets the specified property as a <code>String</code>, or the default value if the property isn't found.
- *
- * @param propertyName
- * The name of the requested string property value.
- * @param defaultValue
- * The value to return if the property is undefined.
- * @return The value of the requested property, or the default value if the property is undefined.
- */
- public static String getProperty(String propertyName, String defaultValue)
- {
- return props.getProperty(propertyName, defaultValue);
- }
-
- /**
- * Gets the specified property as an <code>int</code>, or the default value if the property isn't found.
- *
- * @param propertyName
- * The name of the requested int property value.
- * @param defaultValue
- * The value to return if the property is undefined.
- * @return The value of the requested property, or the default value if the property is undefined.
- * @throws NumberFormatException
- * If the property does not contain a parsable <code>int</code> value.
- */
- public static int getIntProperty(String propertyName, int defaultValue)
- {
- String value = props.getProperty(propertyName);
- return (value == null) ? defaultValue : Integer.parseInt(value);
- }
-
- /**
- * Gets the specified property as an <code>long</code>, or the default value if the property isn't found.
- *
- * @param propertyName
- * The name of the requested long property value.
- * @param defaultValue
- * The value to return if the property is undefined.
- * @return The value of the requested property, or the default value if the property is undefined.
- * @throws NumberFormatException
- * If the property does not contain a parsable <code>long</code> value.
- */
- public static long getLongProperty(String propertyName, long defaultValue)
- {
- String value = props.getProperty(propertyName);
- return (value == null) ? defaultValue : Long.parseLong(value);
- }
-
- /**
- * Gets the specified property as a <code>boolean</code>, or the default value if the property isn't defined.
- *
- * @param propertyName
- * The name of the requested boolean property value.
- * @param defaultValue
- * The value to return if the property is undefined.
- * @return <code>true</code> if the property is defined and has the value "true", otherwise <code>defaultValue</code>.
- */
- public static boolean getBooleanProperty(String propertyName, boolean defaultValue)
- {
- return (isSetTrue(propertyName)) || defaultValue;
- }
-
- /**
- * Returns <code>true</code> iff the specified boolean property value is "true".
- * <p>
- * If the property is not found, or it's value is not "true" then the method will return <code>false</code>.
- *
- * @param propertyName
- * The name of the requested boolean property value.
- * @return <code>true</code> if the property is defined and has the value "true", otherwise <code>false</code>.
- */
- public static boolean isSetTrue(String propertyName)
- {
- String value = props.getProperty(propertyName);
- return "true".equals(value);
- }
-
- /**
- * Sets the property to the given value.
- * <p>
- * Any previous values stored at the same property name are replaced.
- *
- * @param propertyName
- * The name of the property to set.
- * @param value
- * The property value.
- */
- public static void setProperty(String propertyName, String value)
- {
- props.setProperty(propertyName, value);
- }
-
- /**
- * Returns true iff the given property name is defined.
- *
- * @param propertyName
- * The property name to test.
- * @return <code>true</code> if the property is found in the configuration, or <code>false</code> otherwise.
- */
- public static boolean hasProperty(String propertyName)
- {
- return props.containsKey(propertyName);
- }
-
- /**
- * Appends a property via a comma separated list
- * <p>
- * If the property does not exist, it adds it.
- *
- * @param propertyName
- * The property whose value is to be appended with the given value.
- * @param value
- * The value to be stored, or appended to the current value.
- */
- public static void appendProperty(String propertyName, String value)
- {
- String oldValue = props.getProperty(propertyName);
-
- if (oldValue != null && !oldValue.equals("none"))
- {
- oldValue += "," + value;
- }
- else
- {
- oldValue = value;
- }
- props.setProperty(propertyName, oldValue);
- }
-
- /**
- * Loads the properties from local properties file in the specified directory.
- * <p>
- * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration.
- *
- * @param dirName
- * The directory to search for the new properties files.
- */
- public static void loadPropsFromDir(String dirName)
- {
- logger.info("Loading properties from dirName = " + dirName);
- File[] directoryListing = new File(dirName).listFiles(new FilenameFilter()
- {
- @Override
- public boolean accept(File dir, String name)
- {
- return name.endsWith(".properties");
- }
- });
-
- if (directoryListing != null)
- {
- for (File file : directoryListing)
- {
- loadPropsFromFile(file);
- }
- }
- }
-
- /**
- * Loads the properties from local properties file in the specified directory in hdfs.
- * <p>
- * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration.
- *
- * @param dirName
- * The directory to search for the new properties files.
- * @throws IOException
- * @throws FileNotFoundException
- */
- public static void loadPropsFromHDFSDir(String dirName, FileSystem fs) throws FileNotFoundException, IOException
- {
- logger.info("Loading properties from dirName = " + dirName);
-
- Path dirPath = new Path(dirName);
-
- FileStatus[] status = fs.listStatus(dirPath);
- for (int i=0;i<status.length;i++)
- {
- if(status[i].getPath().getName().endsWith(".properties"))
- {
- loadPropsFromFile(status[i].getPath(), fs);
- }
- }
- }
-
- /**
- * Loads the properties from the specified file.
- * <p>
- * The new properties are added to the current system configuration.
- *
- * @param file
- * The properties file containing the system properties to add.
- */
- public static void loadPropsFromFile(File file)
- {
- if (file.exists())
- {
- try (InputStream stream = new FileInputStream(file))
- {
- logger.info("Loading properties file '" + file.getAbsolutePath() + "'");
- loadProperties(stream);
- } catch (IOException e)
- {
- logger.error("Problem loading properties file '" + file.getAbsolutePath() + "'");
- e.printStackTrace();
- }
- }
- else
- {
- logger.warn("Properties file does not exist: '" + file.getAbsolutePath() + "'");
- }
- }
-
- /**
- * Loads the properties from the specified file in hdfs
- * <p>
- * The new properties are added to the current system configuration.
- *
- * @param file
- * The properties file containing the system properties to add.
- * @throws IOException
- */
- public static void loadPropsFromFile(String filename, FileSystem fs) throws IOException
- {
- Path p = new Path(filename);
- loadPropsFromFile(p, fs);
- }
-
- /**
- * Loads the properties from the specified file in hdfs
- * <p>
- * The new properties are added to the current system configuration.
- *
- * @param file
- * The properties file containing the system properties to add.
- * @throws IOException
- */
- public static void loadPropsFromFile(Path filePath, FileSystem fs) throws IOException
- {
- if(fs.exists(filePath))
- {
- try (InputStream stream = fs.open(filePath);)
- {
- logger.info("Loading properties file from hdfs'" + filePath.toString() + "'");
- loadProperties(stream);
- } catch (IOException e)
- {
- logger.error("Problem loading properties file from hdfs '" + filePath.toString() + "'");
- e.printStackTrace();
- }
- }
- else
- {
- logger.warn("Properties file does not exist: '" + filePath.toString() + "'");
- }
- }
-
- /**
- * Loads the properties from the specified resource on the current classloader.
- * <p>
- * The new properties are added to the current system configuration.
- *
- * @param name
- * The name of the resource defining the properties.
- */
- public static void loadPropsFromResource(String name)
- {
- try (InputStream stream = SystemConfiguration.class.getClassLoader().getResourceAsStream(name))
- {
- if (stream != null)
- {
- logger.info("Loading file '" + name + "'");
- loadProperties(stream);
- }
- else
- {
- logger.error("No file found '" + name + "'");
- }
- } catch (IOException e)
- {
- logger.error("Problem loading file '" + name + "'");
- e.printStackTrace();
- }
- }
-
- /**
- * Load the properties in the Properties object and then trim any whitespace
- * <p>
- * Properties.load does not do this automatically
- * @throws IOException
- */
- public static void loadProperties(InputStream stream) throws IOException
- {
- props.load(stream);
-
- Enumeration propKeys = props.propertyNames();
- while(propKeys.hasMoreElements())
- {
- String tmpKey = (String)propKeys.nextElement();
- String tmpValue = props.getProperty(tmpKey);
- tmpValue = tmpValue.trim();
- props.put(tmpKey, tmpValue);
- }
- }
+ private static final Logger logger = LoggerFactory.getLogger(SystemConfiguration.class);
+
+ private static final Properties props = new Properties();
+
+ /**
+ * By default, these files should be found on the root of the classpath
+ */
+ private static final String DEFAULT_PROPERTY_FILE = "pirk.properties";
+ private static final String QUERIER_PROPERTIES_FILE = "querier.properties";
+ private static final String RESPONDER_PROPERTIES_FILE = "responder.properties";
+
+ private static final String LOCAL_PROPERTIES_DIR = "local.pirk.properties.dir";
+
+ static
+ {
+ initialize();
+ }
+
+ public static void initialize()
+ {
+ props.clear();
+
+ // First try to load the default properties file
+ loadPropsFromResource(DEFAULT_PROPERTY_FILE);
+
+ // Try to load props from the querier and responder property files, if they exist
+ loadPropsFromResource(QUERIER_PROPERTIES_FILE);
+ loadPropsFromResource(RESPONDER_PROPERTIES_FILE);
+
+ // Try to load the local properties files, if they exists
+ loadPropsFromDir(getProperty(LOCAL_PROPERTIES_DIR));
+ }
+
+ /**
+ * Gets the specified property; returns <code>null</code> if the property isn't found.
+ *
+ * @param propertyName
+ * The name of the requested property.
+ * @return The value of the property, or <code>null</code> if the property cannot be found.
+ */
+ public static String getProperty(String propertyName)
+ {
+ return props.getProperty(propertyName);
+ }
+
+ /**
+ * Gets the specified property as a <code>String</code>, or the default value if the property isn't found.
+ *
+ * @param propertyName
+ * The name of the requested string property value.
+ * @param defaultValue
+ * The value to return if the property is undefined.
+ * @return The value of the requested property, or the default value if the property is undefined.
+ */
+ public static String getProperty(String propertyName, String defaultValue)
+ {
+ return props.getProperty(propertyName, defaultValue);
+ }
+
+ /**
+ * Gets the specified property as an <code>int</code>, or the default value if the property isn't found.
+ *
+ * @param propertyName
+ * The name of the requested int property value.
+ * @param defaultValue
+ * The value to return if the property is undefined.
+ * @return The value of the requested property, or the default value if the property is undefined.
+ * @throws NumberFormatException
+ * If the property does not contain a parsable <code>int</code> value.
+ */
+ public static int getIntProperty(String propertyName, int defaultValue)
+ {
+ String value = props.getProperty(propertyName);
+ return (value == null) ? defaultValue : Integer.parseInt(value);
+ }
+
+ /**
+ * Gets the specified property as an <code>long</code>, or the default value if the property isn't found.
+ *
+ * @param propertyName
+ * The name of the requested long property value.
+ * @param defaultValue
+ * The value to return if the property is undefined.
+ * @return The value of the requested property, or the default value if the property is undefined.
+ * @throws NumberFormatException
+ * If the property does not contain a parsable <code>long</code> value.
+ */
+ public static long getLongProperty(String propertyName, long defaultValue)
+ {
+ String value = props.getProperty(propertyName);
+ return (value == null) ? defaultValue : Long.parseLong(value);
+ }
+
+ /**
+ * Gets the specified property as a <code>boolean</code>, or the default value if the property isn't defined.
+ *
+ * @param propertyName
+ * The name of the requested boolean property value.
+ * @param defaultValue
+ * The value to return if the property is undefined.
+ * @return <code>true</code> if the property is defined and has the value "true", otherwise <code>defaultValue</code>.
+ */
+ public static boolean getBooleanProperty(String propertyName, boolean defaultValue)
+ {
+ return (isSetTrue(propertyName)) || defaultValue;
+ }
+
+ /**
+ * Returns <code>true</code> iff the specified boolean property value is "true".
+ * <p>
+ * If the property is not found, or it's value is not "true" then the method will return <code>false</code>.
+ *
+ * @param propertyName
+ * The name of the requested boolean property value.
+ * @return <code>true</code> if the property is defined and has the value "true", otherwise <code>false</code>.
+ */
+ public static boolean isSetTrue(String propertyName)
+ {
+ String value = props.getProperty(propertyName);
+ return "true".equals(value);
+ }
+
+ /**
+ * Sets the property to the given value.
+ * <p>
+ * Any previous values stored at the same property name are replaced.
+ *
+ * @param propertyName
+ * The name of the property to set.
+ * @param value
+ * The property value.
+ */
+ public static void setProperty(String propertyName, String value)
+ {
+ props.setProperty(propertyName, value);
+ }
+
+ /**
+ * Returns true iff the given property name is defined.
+ *
+ * @param propertyName
+ * The property name to test.
+ * @return <code>true</code> if the property is found in the configuration, or <code>false</code> otherwise.
+ */
+ public static boolean hasProperty(String propertyName)
+ {
+ return props.containsKey(propertyName);
+ }
+
+ /**
+ * Appends a property via a comma separated list
+ * <p>
+ * If the property does not exist, it adds it.
+ *
+ * @param propertyName
+ * The property whose value is to be appended with the given value.
+ * @param value
+ * The value to be stored, or appended to the current value.
+ */
+ public static void appendProperty(String propertyName, String value)
+ {
+ String oldValue = props.getProperty(propertyName);
+
+ if (oldValue != null && !oldValue.equals("none"))
+ {
+ oldValue += "," + value;
+ }
+ else
+ {
+ oldValue = value;
+ }
+ props.setProperty(propertyName, oldValue);
+ }
+
+ /**
+ * Loads the properties from local properties file in the specified directory.
+ * <p>
+ * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration.
+ *
+ * @param dirName
+ * The directory to search for the new properties files.
+ */
+ public static void loadPropsFromDir(String dirName)
+ {
+ logger.info("Loading properties from dirName = " + dirName);
+ File[] directoryListing = new File(dirName).listFiles(new FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return name.endsWith(".properties");
+ }
+ });
+
+ if (directoryListing != null)
+ {
+ for (File file : directoryListing)
+ {
+ loadPropsFromFile(file);
+ }
+ }
+ }
+
+ /**
+ * Loads the properties from local properties file in the specified directory in hdfs.
+ * <p>
+ * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration.
+ *
+ * @param dirName
+ * The directory to search for the new properties files.
+ * @throws IOException
+ * @throws FileNotFoundException
+ */
+ public static void loadPropsFromHDFSDir(String dirName, FileSystem fs) throws FileNotFoundException, IOException
+ {
+ logger.info("Loading properties from dirName = " + dirName);
+
+ Path dirPath = new Path(dirName);
+
+ FileStatus[] status = fs.listStatus(dirPath);
+ for (int i = 0; i < status.length; i++)
+ {
+ if (status[i].getPath().getName().endsWith(".properties"))
+ {
+ loadPropsFromFile(status[i].getPath(), fs);
+ }
+ }
+ }
+
+ /**
+ * Loads the properties from the specified file.
+ * <p>
+ * The new properties are added to the current system configuration.
+ *
+ * @param file
+ * The properties file containing the system properties to add.
+ */
+ public static void loadPropsFromFile(File file)
+ {
+ if (file.exists())
+ {
+ try (InputStream stream = new FileInputStream(file))
+ {
+ logger.info("Loading properties file '" + file.getAbsolutePath() + "'");
+ loadProperties(stream);
+ } catch (IOException e)
+ {
+ logger.error("Problem loading properties file '" + file.getAbsolutePath() + "'");
+ e.printStackTrace();
+ }
+ }
+ else
+ {
+ logger.warn("Properties file does not exist: '" + file.getAbsolutePath() + "'");
+ }
+ }
+
+ /**
+ * Loads the properties from the specified file in hdfs
+ * <p>
+ * The new properties are added to the current system configuration.
+ *
+ * @param file
+ * The properties file containing the system properties to add.
+ * @throws IOException
+ */
+ public static void loadPropsFromFile(String filename, FileSystem fs) throws IOException
+ {
+ Path p = new Path(filename);
+ loadPropsFromFile(p, fs);
+ }
+
+ /**
+ * Loads the properties from the specified file in hdfs
+ * <p>
+ * The new properties are added to the current system configuration.
+ *
+ * @param file
+ * The properties file containing the system properties to add.
+ * @throws IOException
+ */
+ public static void loadPropsFromFile(Path filePath, FileSystem fs) throws IOException
+ {
+ if (fs.exists(filePath))
+ {
+ try (InputStream stream = fs.open(filePath);)
+ {
+ logger.info("Loading properties file from hdfs'" + filePath.toString() + "'");
+ loadProperties(stream);
+ } catch (IOException e)
+ {
+ logger.error("Problem loading properties file from hdfs '" + filePath.toString() + "'");
+ e.printStackTrace();
+ }
+ }
+ else
+ {
+ logger.warn("Properties file does not exist: '" + filePath.toString() + "'");
+ }
+ }
+
+ /**
+ * Loads the properties from the specified resource on the current classloader.
+ * <p>
+ * The new properties are added to the current system configuration.
+ *
+ * @param name
+ * The name of the resource defining the properties.
+ */
+ public static void loadPropsFromResource(String name)
+ {
+ try (InputStream stream = SystemConfiguration.class.getClassLoader().getResourceAsStream(name))
+ {
+ if (stream != null)
+ {
+ logger.info("Loading file '" + name + "'");
+ loadProperties(stream);
+ }
+ else
+ {
+ logger.error("No file found '" + name + "'");
+ }
+ } catch (IOException e)
+ {
+ logger.error("Problem loading file '" + name + "'");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Load the properties in the Properties object and then trim any whitespace
+ * <p>
+ * Properties.load does not do this automatically
+ *
+ * @throws IOException
+ */
+ public static void loadProperties(InputStream stream) throws IOException
+ {
+ props.load(stream);
+
+ Enumeration propKeys = props.propertyNames();
+ while (propKeys.hasMoreElements())
+ {
+ String tmpKey = (String) propKeys.nextElement();
+ String tmpValue = props.getProperty(tmpKey);
+ tmpValue = tmpValue.trim();
+ props.put(tmpKey, tmpValue);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/resources/pirk.properties
----------------------------------------------------------------------
diff --git a/src/main/resources/pirk.properties b/src/main/resources/pirk.properties
index a88c846..543b8b1 100755
--- a/src/main/resources/pirk.properties
+++ b/src/main/resources/pirk.properties
@@ -69,6 +69,12 @@ data.schemas = none
## <filterNames>
## <name> (optional) element name of element in the data schema to apply pre-processing filters </name>
## </filterNames>
+## <additional> (optional) additional fields for the query schema, in <key,value> pairs
+## <field>
+## <key> key corresponding the the field </key>
+## <value> value corresponding to the field </value>
+## </field>
+## </additional>
## </schema>
##
##
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/resources/query-schema.xsd
----------------------------------------------------------------------
diff --git a/src/main/resources/query-schema.xsd b/src/main/resources/query-schema.xsd
index 65a36ce..db339b7 100644
--- a/src/main/resources/query-schema.xsd
+++ b/src/main/resources/query-schema.xsd
@@ -1,119 +1,158 @@
<?xml version="1.0" encoding="UTF-8" ?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with ~
+ this work for additional information regarding copyright ownership. ~ The
+ ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
+ "License"); you may not use this file except in compliance with ~ the License.
+ You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ ~ Unless required by applicable law or agreed to in writing, software ~
+ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
+ License for the specific language governing permissions and ~ limitations
+ under the License. -->
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
- targetNamespace="http://pirk.apache.org" xmlns="http://pirk.apache.org"
- elementFormDefault="qualified">
+ targetNamespace="http://pirk.apache.org" xmlns="http://pirk.apache.org"
+ elementFormDefault="qualified">
- <xs:element name="schema">
- <xs:complexType>
- <xs:sequence>
- <xs:element name="schemaName" type="xs:string">
- <xs:annotation>
- <xs:documentation>
- The name of the query schema.
- The name omits leading and trailing
- whitespace, and is case sensitive.
- </xs:documentation>
- </xs:annotation>
- </xs:element>
+ <xs:element name="schema">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="schemaName" type="xs:string">
+ <xs:annotation>
+ <xs:documentation>
+ The name of the query schema.
+ The name omits
+ leading and trailing
+ whitespace, and is case sensitive.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
- <xs:element name="dataSchemaName" type="xs:string">
- <xs:annotation>
- <xs:documentation>
- The name of the data schema
- over which this query is run. The name omits
- leading and trailing whitespace, and is case
- sensitive.
- </xs:documentation>
- </xs:annotation>
- </xs:element>
+ <xs:element name="dataSchemaName" type="xs:string">
+ <xs:annotation>
+ <xs:documentation>
+ The name of the data schema
+ over which this query
+ is run. The name omits
+ leading and trailing whitespace, and is
+ case
+ sensitive.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
- <xs:element name="selectorName" type="xs:string">
- <xs:annotation>
- <xs:documentation>The name of the name of the
- element in the data schema that will be the
- selector for this query.
- </xs:documentation>
- </xs:annotation>
- </xs:element>
+ <xs:element name="selectorName" type="xs:string">
+ <xs:annotation>
+ <xs:documentation>The name of the name of the
+ element in the data
+ schema that will be the
+ selector for this query.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
- <xs:element name="elements">
- <xs:annotation>
- <xs:documentation>
- The set of element names to
- include in the query response.
- </xs:documentation>
- </xs:annotation>
- <xs:complexType>
- <xs:sequence>
+ <xs:element name="elements">
+ <xs:annotation>
+ <xs:documentation>
+ The set of element names to
+ include in the query
+ response.
+ </xs:documentation>
+ </xs:annotation>
+ <xs:complexType>
+ <xs:sequence>
- <xs:element name="name" type="xs:string"
- maxOccurs="unbounded">
- <xs:annotation>
- <xs:documentation>
- The name of an
- element in the data schema to
- include in the query response.
- </xs:documentation>
- </xs:annotation>
- </xs:element>
+ <xs:element name="name" type="xs:string" maxOccurs="unbounded">
+ <xs:annotation>
+ <xs:documentation>
+ The name of an
+ element in the data schema to
+ include in the query response.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
- </xs:sequence>
- </xs:complexType>
- </xs:element>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
- <xs:element name="filter" type="xs:string"
- minOccurs="0">
- <xs:annotation>
- <xs:documentation>
- The name of a class used to
- filter the query response data.
- </xs:documentation>
- </xs:annotation>
- </xs:element>
+ <xs:element name="filter" type="xs:string" minOccurs="0">
+ <xs:annotation>
+ <xs:documentation>
+ The name of a class used to
+ filter the query
+ response data.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
- <xs:element name="filterNames" minOccurs="0"
- maxOccurs="unbounded">
- <xs:annotation>
- <xs:documentation>
- The set of data element names
- over which the
- response filter is applied.
- </xs:documentation>
- </xs:annotation>
- <xs:complexType>
- <xs:sequence>
+ <xs:element name="filterNames" minOccurs="0" maxOccurs="unbounded">
+ <xs:annotation>
+ <xs:documentation>
+ The set of data element names
+ over which the
+ response filter is applied.
+ </xs:documentation>
+ </xs:annotation>
+ <xs:complexType>
+ <xs:sequence>
- <xs:element name="name" type="xs:string">
- <xs:annotation>
- <xs:documentation>
- The name of an
- element in the data schema over
- which to apply the filter.
- </xs:documentation>
- </xs:annotation>
- </xs:element>
+ <xs:element name="name" type="xs:string">
+ <xs:annotation>
+ <xs:documentation>
+ The name of an
+ element in the data schema over
+ which to apply the filter.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
- </xs:sequence>
- </xs:complexType>
- </xs:element>
- </xs:sequence>
- </xs:complexType>
- </xs:element>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="additional" minOccurs="0">
+ <xs:annotation>
+ <xs:documentation>
+ Additional set of fields to include in the query
+ schema.
+ </xs:documentation>
+ </xs:annotation>
+ <xs:complexType>
+ <xs:sequence>
+
+ <xs:element name="field" maxOccurs="unbounded">
+ <xs:annotation>
+ <xs:documentation>
+ Additional field to include in the query schema
+ </xs:documentation>
+ </xs:annotation>
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="key" type="xs:string">
+ <xs:annotation>
+ <xs:documentation>
+ The key corresponding to the the field
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
+ <xs:element name="value" type="xs:string">
+ <xs:annotation>
+ <xs:documentation>
+ The value corresponding to the field
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
</xs:schema>
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
index bb70153..cab327e 100644
--- a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
+++ b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
@@ -326,7 +326,8 @@ public class QueryParserUtilsTest
private void testBooleanQueryMapMapWritableWAW()
{
- assertTrue(QueryParserUtils.checkRecordWritableArrayWritable("?q=qtype:1+AND+date:[2015-05-05T20:33:07.000Z+TO+2016-02-20T23:29:05.000Z]", docWAW, dSchema));
+ assertTrue(
+ QueryParserUtils.checkRecordWritableArrayWritable("?q=qtype:1+AND+date:[2015-05-05T20:33:07.000Z+TO+2016-02-20T23:29:05.000Z]", docWAW, dSchema));
assertTrue(QueryParserUtils.checkRecordWritableArrayWritable("?q=qtype:5+OR+date:[2015-05-05T20:33:07.000Z+TO+2016-02-20T23:29:05.000Z]", docWAW, dSchema));
assertTrue(QueryParserUtils.checkRecordWritableArrayWritable("?q=qtype:1+AND+rcode:0+AND+date:[2015-05-05T20:33:07.000Z+TO+2016-02-20T23:29:05.000Z]",