You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pirk.apache.org by DarinJ <gi...@git.apache.org> on 2016/10/13 12:46:02 UTC

[GitHub] incubator-pirk pull request #108: WIP: Submodule Refactor Phase 1

GitHub user DarinJ opened a pull request:

    https://github.com/apache/incubator-pirk/pull/108

    WIP: Submodule Refactor Phase 1

    This is going to have to be broken into chunks.
    
    -1. Pulled out mapreduce, storm and spark (sparkstreaming in spark) responders.
    -2. Added methods to ResponderPlugin interface to setup a responder for a test and return the decrypted response.
    -2. Refactored `org.apache.pirk.test.DistributedTestSuite`, `org.apache.pirk.test.Utils.BaseTests` to handle submodules.  This will likely need more work as the unit tests in pirk-core depend on some of the code in `org.apache.pirk.test.Utils.BaseTests` as a goal is to pull the distributed tests into a submodule.
    
    I've tested the Distributed Tests with the JSON input format on a local K8s hadoop cluster.  I haven't been able to set up an Elastic Search instance yet, but it looks like those tests "Fail correctly".
    
    Todo Pull out Querier, Responder, Benchmarks, etc Modules as mentioned on the mailing list.  Some methods in the framework responder impls need pulled out into a either an abstract class, interface with default methods, or class with static methods.
    
    This is still rough, but I think it's the right direction.  I took notes on the DistributedTestSuite and how state is being moved to various locations via system configurations.  I'll post those and suggestions to the mailing list.  Likely it'll be useful to the configuration discussion.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/DarinJ/incubator-pirk submoduleRefactor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-pirk/pull/108.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #108
    
----
commit 369ab26a962ce90585444484ad734748b1d52cc7
Author: Darin Johnson <da...@apache.org>
Date:   2016-09-29T02:44:31Z

    Submodule Refactor Phase
    -1. Pulled out mapreduce, storm and spark responders,
    -2. Refactored `org.apache.pirk.test.DistributedTestSuite` to handle submodules.
    
     Todo Pull out Querier, Responder Modules as mentioned on the mailing list.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk pull request #108: WIP: Submodule Refactor Phase 1

Posted by ellisonanne <gi...@git.apache.org>.
Github user ellisonanne commented on a diff in the pull request:

    https://github.com/apache/incubator-pirk/pull/108#discussion_r83564697
  
    --- Diff: pirk-core/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java ---
    @@ -18,12 +18,7 @@
      */
     package org.apache.pirk.test.distributed;
     
    -import org.apache.commons.cli.CommandLine;
    -import org.apache.commons.cli.CommandLineParser;
    -import org.apache.commons.cli.GnuParser;
    -import org.apache.commons.cli.HelpFormatter;
    -import org.apache.commons.cli.Option;
    -import org.apache.commons.cli.Options;
    +import org.apache.commons.cli.*;
    --- End diff --
    
    We need to take out the * imports - prob just inadvertently added in via your IDE - this is throughout the PR 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk issue #108: WIP: Submodule Refactor Phase 1

Posted by DarinJ <gi...@git.apache.org>.
Github user DarinJ commented on the issue:

    https://github.com/apache/incubator-pirk/pull/108
  
    Still need to take care of some * imports and the LICENSE/NOTICE files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk pull request #108: WIP: Submodule Refactor Phase 1

Posted by DarinJ <gi...@git.apache.org>.
Github user DarinJ commented on a diff in the pull request:

    https://github.com/apache/incubator-pirk/pull/108#discussion_r83767214
  
    --- Diff: pirk-mapreduce/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.pirk.responder.wideskies.mapreduce;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.util.ToolRunner;
    +import org.apache.pirk.querier.wideskies.Querier;
    +import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
    +import org.apache.pirk.responder.wideskies.ResponderProps;
    +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
    +import org.apache.pirk.response.wideskies.Response;
    +import org.apache.pirk.schema.response.QueryResponseJSON;
    +import org.apache.pirk.serialization.HadoopFileSystemStore;
    +import org.apache.pirk.test.distributed.DistributedTestDriver;
    +import org.apache.pirk.test.utils.TestUtils;
    +import org.apache.pirk.utils.PIRException;
    +import org.apache.pirk.utils.QueryResultsWriter;
    +import org.apache.pirk.utils.SystemConfiguration;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class to launch Map Reduce responder
    + */
    +public class MapReduceResponder implements ResponderPlugin
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(MapReduceResponder.class);
    +
    +  protected void deleteOutput(String outputFile, FileSystem fs) throws PIRException
    +  {
    +    try
    +    {
    +      Path path = new Path(outputFile);
    +      if (fs.exists(path))
    +        fs.delete(path, true); // Ensure old output does not exist.
    +    } catch (IOException e)
    +    {
    +      throw new PIRException("Failed to delete output file " + outputFile + ". " + e.getMessage());
    +    }
    +  }
    +
    +  protected Response getResponse(String outputFile, FileSystem fs) throws PIRException
    +  {
    +    try
    +    {
    +      return new HadoopFileSystemStore(fs).recall(outputFile, Response.class);
    +    } catch (IOException e)
    +    {
    +      throw new PIRException("getResponse: Failed to open output file " + outputFile + ". " + e.getMessage());
    +    }
    +  }
    +
    +  protected File getTempFile() throws PIRException
    +  {
    +    try
    +    {
    +      return File.createTempFile("finalResultsFile", ".txt");
    +    } catch (IOException e)
    +    {
    +      throw new PIRException("Failed to open temp file: finalResultsFile.txt." + e.getMessage());
    +    }
    +  }
    +
    +  protected void writeDecriptedResults(DecryptResponse decryptResponse, File output, int numThreads) throws PIRException
    --- End diff --
    
    I'm blaming that on my IDE :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk pull request #108: WIP: Submodule Refactor Phase 1

Posted by ellisonanne <gi...@git.apache.org>.
Github user ellisonanne commented on a diff in the pull request:

    https://github.com/apache/incubator-pirk/pull/108#discussion_r83564967
  
    --- Diff: pirk-spark/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java ---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.pirk.responder.wideskies.spark.streaming;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.security.Permission;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.pirk.querier.wideskies.Querier;
    +import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
    +import org.apache.pirk.responder.wideskies.ResponderProps;
    +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
    +import org.apache.pirk.response.wideskies.Response;
    +import org.apache.pirk.schema.response.QueryResponseJSON;
    +import org.apache.pirk.serialization.HadoopFileSystemStore;
    +import org.apache.pirk.test.distributed.DistributedTestDriver;
    +import org.apache.pirk.test.utils.TestUtils;
    +import org.apache.pirk.utils.PIRException;
    +import org.apache.pirk.utils.QueryResultsWriter;
    +import org.apache.pirk.utils.SystemConfiguration;
    +import org.apache.spark.launcher.SparkLauncher;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class to launch stand alone responder
    + */
    +public class SparkStreamingResponder implements ResponderPlugin
    +{
    +
    +  private static final Logger logger = LoggerFactory.getLogger(SparkStreamingResponder.class);
    +
    +  protected void deleteOutput(String outputFile, FileSystem fs) throws PIRException
    +  {
    +    try
    +    {
    +      Path path = new Path(outputFile);
    +      if (fs.exists(path))
    +        fs.delete(path, true); // Ensure old output does not exist.
    +    } catch (IOException e)
    +    {
    +      throw new PIRException("Failed to delete output file " + outputFile + ". " + e.getMessage());
    +    }
    +  }
    +
    +  protected Response getResponse(String outputFile, FileSystem fs) throws PIRException
    +  {
    +    try
    +    {
    +      return new HadoopFileSystemStore(fs).recall(outputFile, Response.class);
    +    } catch (IOException e)
    +    {
    +      throw new PIRException("getResponse: Failed to open output file " + outputFile + ". " + e.getMessage());
    +    }
    +  }
    +
    +  protected File getTempFile() throws PIRException
    +  {
    +    try
    +    {
    +      return File.createTempFile("finalResultsFile", ".txt");
    +    } catch (IOException e)
    +    {
    +      throw new PIRException("Failed to open temp file: finalResultsFile.txt." + e.getMessage());
    +    }
    +  }
    +
    +  protected void writeDecriptedResults(DecryptResponse decryptResponse, File output, int numThreads) throws PIRException
    --- End diff --
    
    Decrypted misspelled


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk pull request #108: WIP: Submodule Refactor Phase 1

Posted by ellisonanne <gi...@git.apache.org>.
Github user ellisonanne commented on a diff in the pull request:

    https://github.com/apache/incubator-pirk/pull/108#discussion_r83564867
  
    --- Diff: pirk-mapreduce/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java ---
    @@ -0,0 +1,163 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.pirk.responder.wideskies.mapreduce;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.util.ToolRunner;
    +import org.apache.pirk.querier.wideskies.Querier;
    +import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
    +import org.apache.pirk.responder.wideskies.ResponderProps;
    +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
    +import org.apache.pirk.response.wideskies.Response;
    +import org.apache.pirk.schema.response.QueryResponseJSON;
    +import org.apache.pirk.serialization.HadoopFileSystemStore;
    +import org.apache.pirk.test.distributed.DistributedTestDriver;
    +import org.apache.pirk.test.utils.TestUtils;
    +import org.apache.pirk.utils.PIRException;
    +import org.apache.pirk.utils.QueryResultsWriter;
    +import org.apache.pirk.utils.SystemConfiguration;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class to launch Map Reduce responder
    + */
    +public class MapReduceResponder implements ResponderPlugin
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(MapReduceResponder.class);
    +
    +  protected void deleteOutput(String outputFile, FileSystem fs) throws PIRException
    +  {
    +    try
    +    {
    +      Path path = new Path(outputFile);
    +      if (fs.exists(path))
    +        fs.delete(path, true); // Ensure old output does not exist.
    +    } catch (IOException e)
    +    {
    +      throw new PIRException("Failed to delete output file " + outputFile + ". " + e.getMessage());
    +    }
    +  }
    +
    +  protected Response getResponse(String outputFile, FileSystem fs) throws PIRException
    +  {
    +    try
    +    {
    +      return new HadoopFileSystemStore(fs).recall(outputFile, Response.class);
    +    } catch (IOException e)
    +    {
    +      throw new PIRException("getResponse: Failed to open output file " + outputFile + ". " + e.getMessage());
    +    }
    +  }
    +
    +  protected File getTempFile() throws PIRException
    +  {
    +    try
    +    {
    +      return File.createTempFile("finalResultsFile", ".txt");
    +    } catch (IOException e)
    +    {
    +      throw new PIRException("Failed to open temp file: finalResultsFile.txt." + e.getMessage());
    +    }
    +  }
    +
    +  protected void writeDecriptedResults(DecryptResponse decryptResponse, File output, int numThreads) throws PIRException
    --- End diff --
    
    Nitpick - decrypted is misspelled... ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk issue #108: WIP: Submodule Refactor Phase 1

Posted by ellisonanne <gi...@git.apache.org>.
Github user ellisonanne commented on the issue:

    https://github.com/apache/incubator-pirk/pull/108
  
    Looks like it's headed in the right direction - a few minor comments inline. I did not run the tests.
    
    Before the PR is completed, the LICENSE and NOTICE files need to be appropriately added to each submodule. 
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk issue #108: WIP: Submodule Refactor Phase 1

Posted by DarinJ <gi...@git.apache.org>.
Github user DarinJ commented on the issue:

    https://github.com/apache/incubator-pirk/pull/108
  
    Closing to change from WIP


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk pull request #108: WIP: Submodule Refactor Phase 1

Posted by DarinJ <gi...@git.apache.org>.
Github user DarinJ commented on a diff in the pull request:

    https://github.com/apache/incubator-pirk/pull/108#discussion_r83766665
  
    --- Diff: pirk-core/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java ---
    @@ -18,12 +18,7 @@
      */
     package org.apache.pirk.test.distributed;
     
    -import org.apache.commons.cli.CommandLine;
    -import org.apache.commons.cli.CommandLineParser;
    -import org.apache.commons.cli.GnuParser;
    -import org.apache.commons.cli.HelpFormatter;
    -import org.apache.commons.cli.Option;
    -import org.apache.commons.cli.Options;
    +import org.apache.commons.cli.*;
    --- End diff --
    
    yep, will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk pull request #108: WIP: Submodule Refactor Phase 1

Posted by DarinJ <gi...@git.apache.org>.
Github user DarinJ closed the pull request at:

    https://github.com/apache/incubator-pirk/pull/108


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-pirk pull request #108: WIP: Submodule Refactor Phase 1

Posted by DarinJ <gi...@git.apache.org>.
Github user DarinJ commented on a diff in the pull request:

    https://github.com/apache/incubator-pirk/pull/108#discussion_r83767176
  
    --- Diff: pirk-core/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java ---
    @@ -0,0 +1,334 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.pirk.test.distributed.testsuite;
    +
    +import java.util.List;
    +
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.pirk.inputformat.hadoop.InputFormatConst;
    +import org.apache.pirk.test.distributed.DistributedTestDriver;
    +import org.apache.pirk.test.utils.BaseTests;
    +import org.apache.pirk.utils.SystemConfiguration;
    +import org.json.simple.JSONObject;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Distributed test class for PIR
    + * 
    + */
    +public class DistTestSuite
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(DistTestSuite.class);
    +  private static final String MAPREDUCE = "mapreduce";
    +  private static final String SPARK = "spark";
    +  private static final String SPARKSTREAMING = "sparkstreaming";
    +
    +  public static void testJSONInput(String platform, FileSystem fs, List<JSONObject> dataElements) throws Exception
    +  {
    +    logger.info("Starting testJSONInput for " + platform);
    +
    +    // Pull original data and query schema properties
    +    SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
    +    SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
    +
    +    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false");
    +    SystemConfiguration.setProperty("pir.maxHitsPerSelector", "100");
    +
    +    SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false");
    +    SystemConfiguration.setProperty("pir.embedQuerySchema", "false");
    +
    +    // Set up base configs
    +    SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.BASE_FORMAT);
    +    SystemConfiguration.setProperty("pir.inputData", SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY));
    +    SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:0");
    +
    +    // Run tests
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, platform, true, 1);
    +    BaseTests.testDNSIPQuery(dataElements, fs, platform, true, 1);
    +
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "false");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, platform, true, 2);
    +    BaseTests.testDNSIPQuery(dataElements, fs, platform, true, 2);
    +
    +    BaseTests.testSRCIPQueryNoFilter(dataElements, fs, platform, true, 2);
    +
    +    // Test hit limits per selector
    +    // Test hit limits per selector
    +    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true");
    +    SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, platform, true, 3);
    +    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false");
    +    SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000");
    +
    +    // Test the local cache for modular exponentiation
    +    SystemConfiguration.setProperty("pir.useLocalCache", "true");
    +    BaseTests.testDNSIPQuery(dataElements, fs, platform, true, 2);
    +    BaseTests.testSRCIPQuery(dataElements, fs, platform, true, 2);
    +    SystemConfiguration.setProperty("pir.useLocalCache", "false");
    +
    +    // Change query for NXDOMAIN
    +    SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:3");
    +    BaseTests.testDNSNXDOMAINQuery(dataElements, fs, platform, true, 2);
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "false");
    +    BaseTests.testDNSNXDOMAINQuery(dataElements, fs, platform, true, 2);
    +    SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:0");
    +
    +    // Test the expTable cases
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
    +
    +    // In memory table
    +    if (platform.equals(MAPREDUCE))
    +    {
    +      SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
    +      SystemConfiguration.setProperty("pirTest.useExpLookupTable", "true");
    +      BaseTests.testDNSIPQuery(dataElements, fs, MAPREDUCE, true, 2);
    +    }
    +    // Create exp table in hdfs
    +    SystemConfiguration.setProperty("mapreduce.map.memory.mb", "10000");
    +    SystemConfiguration.setProperty("mapreduce.reduce.memory.mb", "10000");
    +    SystemConfiguration.setProperty("mapreduce.map.java.opts", "-Xmx9000m");
    +    SystemConfiguration.setProperty("mapreduce.reduce.java.opts", "-Xmx9000m");
    +
    +    SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "true");
    +    SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
    +    SystemConfiguration.setProperty("pir.expCreationSplits", "50");
    +    SystemConfiguration.setProperty("pir.numExpLookupPartitions", "150");
    +    BaseTests.testDNSIPQuery(dataElements, fs, platform, true, 2);
    +
    +    // Reset exp properties
    +    SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
    +    SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
    +
    +    // Reset property
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
    +
    +    // Test embedded QuerySchema
    +    SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true");
    +    SystemConfiguration.setProperty("pir.embedQuerySchema", "false");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, platform, true, 1);
    +
    +    SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true");
    +    SystemConfiguration.setProperty("pir.embedQuerySchema", "true");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, platform, true, 1);
    +
    +    SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false");
    +    SystemConfiguration.setProperty("pir.embedQuerySchema", "true");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, platform, true, 1);
    +
    +    logger.info("Completed testJSONInput for platform {}", platform);
    +  }
    +
    +  public static void testESInput(String platform, FileSystem fs, List<JSONObject> dataElements) throws Exception
    +  {
    +    logger.info("Starting testESInputMR");
    +
    +    SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
    +    SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
    +
    +    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false");
    +    SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000");
    +
    +    SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false");
    +    SystemConfiguration.setProperty("pir.embedQuerySchema", "false");
    +
    +    // Set up ES configs
    +    SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES);
    +    SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0");
    +    SystemConfiguration.setProperty("pir.esResource", SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_RESOURCE_PROPERTY));
    +
    +    // Run tests
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, platform, true, 1);
    +    BaseTests.testSRCIPQuery(dataElements, fs, platform, true, 2);
    +    BaseTests.testDNSIPQuery(dataElements, fs, platform, true, 1);
    +
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "false");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, platform, true, 2);
    +    BaseTests.testDNSIPQuery(dataElements, fs, platform, true, 2);
    +
    +    // Change query for NXDOMAIN
    +    SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3");
    +
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
    +    BaseTests.testDNSNXDOMAINQuery(dataElements, fs, platform, true, 3);
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "false");
    +    BaseTests.testDNSNXDOMAINQuery(dataElements, fs, platform, true, 3);
    +
    +    logger.info("Completed testESInputMR");
    +  }
    +
    +  public static void testESInputMR(FileSystem fs, List<JSONObject> dataElements) throws Exception
    +  {
    +    logger.info("Starting testESInputMR");
    +
    +    SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
    +    SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
    +
    +    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false");
    +    SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000");
    +
    +    SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false");
    +    SystemConfiguration.setProperty("pir.embedQuerySchema", "false");
    +
    +    // Set up ES configs
    +    SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES);
    +    SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0");
    +    SystemConfiguration.setProperty("pir.esResource", SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_RESOURCE_PROPERTY));
    +
    +    // Run tests
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, MAPREDUCE, true, 1);
    +    BaseTests.testSRCIPQuery(dataElements, fs, MAPREDUCE, true, 2);
    +    BaseTests.testDNSIPQuery(dataElements, fs, MAPREDUCE, true, 1);
    +
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "false");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, MAPREDUCE, true, 2);
    +    BaseTests.testDNSIPQuery(dataElements, fs, MAPREDUCE, true, 2);
    +
    +    // Change query for NXDOMAIN
    +    SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3");
    +
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
    +    BaseTests.testDNSNXDOMAINQuery(dataElements, fs, MAPREDUCE, true, 3);
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "false");
    +    BaseTests.testDNSNXDOMAINQuery(dataElements, fs, MAPREDUCE, true, 3);
    +
    +    logger.info("Completed testESInputMR");
    +  }
    +
    +  public static void testESInputSpark(FileSystem fs, List<JSONObject> dataElements) throws Exception
    +  {
    +    logger.info("Starting testESInputSpark");
    +
    +    SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
    +    SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
    +
    +    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false");
    +    SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000");
    +
    +    SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false");
    +    SystemConfiguration.setProperty("pir.embedQuerySchema", "false");
    +
    +    // Set up ES configs
    +    SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES);
    +    SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0");
    +    SystemConfiguration.setProperty("pir.esResource", SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_RESOURCE_PROPERTY));
    +
    +    // Run tests
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, SPARK, true, 1);
    +    BaseTests.testDNSIPQuery(dataElements, fs, SPARK, true, 1);
    +    BaseTests.testSRCIPQuery(dataElements, fs, SPARK, true, 2);
    +
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "false");
    +    BaseTests.testDNSHostnameQuery(dataElements, fs, SPARK, true, 2);
    +    BaseTests.testDNSIPQuery(dataElements, fs, SPARK, true, 2);
    +
    +    // Change query for NXDOMAIN
    +    SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3");
    +
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
    +    BaseTests.testDNSNXDOMAINQuery(dataElements, fs, SPARK, true, 3);
    +    SystemConfiguration.setProperty("pirTest.embedSelector", "false");
    +    BaseTests.testDNSNXDOMAINQuery(dataElements, fs, SPARK, true, 3);
    +
    +    logger.info("Completed testESInputSpark");
    +  }
    +
    +  public static void testSparkStreaming(FileSystem fs, List<JSONObject> pirDataElements) throws Exception
    +  {
    +    // testJSONInputSparkStreaming(fs, pirDataElements);
    --- End diff --
    
    That was some intermediate work, test* functions are now dead code.  I generalized testJSONInput(String platform, FileSystem fs, List<JSONObject> dataElements and testESInput(String platform, FileSystem fs, List<JSONObject> dataElements).
    
    I didn't delete just yet as I was using for reference.  Will do in final PR.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---