You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pirk.apache.org by te...@apache.org on 2016/09/23 10:13:40 UTC

incubator-pirk git commit: [PIRK-63]- Generalize ResponderDriver to use a RespondLauncher class.

Repository: incubator-pirk
Updated Branches:
  refs/heads/master bf003cb06 -> 43c772c45


[PIRK-63]- Generalize ResponderDriver to use a RespondLauncher class.

This closes apache/incubator-pirk#93


Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/43c772c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/43c772c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/43c772c4

Branch: refs/heads/master
Commit: 43c772c45860393cb24e672dd7efd78e28d0de6f
Parents: bf003cb
Author: Darin Johnson <da...@apache.org>
Authored: Fri Sep 23 10:50:06 2016 +0100
Committer: Tim Ellison <t....@gmail.com>
Committed: Fri Sep 23 10:50:06 2016 +0100

----------------------------------------------------------------------
 .../pirk/responder/wideskies/ResponderCLI.java  |   6 +-
 .../responder/wideskies/ResponderDriver.java    | 116 +++----------------
 .../responder/wideskies/ResponderProps.java     |  10 +-
 .../responder/wideskies/ResponderService.java   |  73 ++++++++++++
 .../wideskies/mapreduce/MapReduceResponder.java |  45 +++++++
 .../wideskies/spark/SparkResponder.java         |  55 +++++++++
 .../streaming/SparkStreamingResponder.java      |  92 +++++++++++++++
 .../wideskies/spi/ResponderPlugin.java          |  40 +++++++
 .../standalone/StandaloneResponder.java         |  58 ++++++++++
 .../wideskies/storm/StormResponder.java         |  45 +++++++
 ...pirk.responder.wideskies.spi.ResponderPlugin |   5 +
 src/main/resources/responder.properties         |  10 +-
 12 files changed, 442 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
index 5ba8170..514491a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
@@ -111,7 +111,7 @@ public class ResponderCLI
    * Method to parse and validate the options provided
    *
    * @return - true if valid, false otherwise
- * @throws IOException 
+ * @throws IOException
    */
   private boolean parseOptions() throws IOException
   {
@@ -170,14 +170,14 @@ public class ResponderCLI
     optionLocalPropFile.setArgName(LOCALPROPFILE);
     optionLocalPropFile.setType(String.class);
     options.addOption(optionLocalPropFile);
-    
+
     // hdfsPropertiesDir
     Option optionHDFSPropDir = new Option("hdfsPropsDir", HDFSPROPDIR, true, "Optional location of directory in hdfs containing properties file(s)");
     optionHDFSPropDir.setRequired(false);
     optionHDFSPropDir.setArgName(HDFSPROPDIR);
     optionHDFSPropDir.setType(String.class);
     options.addOption(optionHDFSPropDir);
-    
+
     // hdfsPropertiesFile
     Option optionHDFSPropFile = new Option("hdfsPropsFile", HDFSPROPFILE, true, "Optional location of properties file(s) in hdfs");
     optionHDFSPropFile.setRequired(false);

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index 044012d..02dbf2e 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -18,18 +18,8 @@
  */
 package org.apache.pirk.responder.wideskies;
 
-import java.security.Permission;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.pirk.query.wideskies.Query;
-import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool;
-import org.apache.pirk.responder.wideskies.spark.ComputeResponse;
-import org.apache.pirk.responder.wideskies.spark.streaming.ComputeStreamingResponse;
-import org.apache.pirk.responder.wideskies.standalone.Responder;
-import org.apache.pirk.responder.wideskies.storm.PirkTopology;
-import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,103 +40,31 @@ public class ResponderDriver
 {
   private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class);
 
-  private enum Platform
-  {
-    MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE
-  }
-
-  public static void main(String[] args) throws Exception
+  public static void main(String[] args)
   {
     ResponderCLI responderCLI = new ResponderCLI(args);
 
-    // For handling System.exit calls from Spark Streaming
-    System.setSecurityManager(new SystemExitManager());
-
-    Platform platform = Platform.NONE;
-    String platformString = SystemConfiguration.getProperty(ResponderProps.PLATFORM);
+    String platformName = SystemConfiguration.getProperty(ResponderProps.PLATFORM, "None");
+    logger.info("Attempting to use platform {} ...", platformName);
     try
     {
-      platform = Platform.valueOf(platformString.toUpperCase());
-    } catch (IllegalArgumentException e)
-    {
-      logger.error("platform " + platformString + " not found.");
-    }
-
-    logger.info("platform = " + platform);
-    switch (platform)
-    {
-      case MAPREDUCE:
-        logger.info("Launching MapReduce ResponderTool:");
-
-        ComputeResponseTool pirWLTool = new ComputeResponseTool();
-        ToolRunner.run(pirWLTool, new String[] {});
-        break;
-
-      case SPARK:
-        logger.info("Launching Spark ComputeResponse:");
-
-        ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration()));
-        computeResponse.performQuery();
-        break;
-
-      case SPARKSTREAMING:
-        logger.info("Launching Spark ComputeStreamingResponse:");
-
-        ComputeStreamingResponse computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration()));
-        try
-        {
-          computeSR.performQuery();
-        } catch (SystemExitException e)
-        {
-          // If System.exit(0) is not caught from Spark Streaming,
-          // the application will complete with a 'failed' status
-          logger.info("Exited with System.exit(0) from Spark Streaming");
-        }
-
-        // Teardown the context
-        computeSR.teardown();
-        break;
-
-      case STORM:
-        logger.info("Launching Storm PirkTopology:");
-        PirkTopology.runPirkTopology();
-        break;
-
-      case STANDALONE:
-        logger.info("Launching Standalone Responder:");
-
-        String queryInput = SystemConfiguration.getProperty("pir.queryInput");
-        Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
-
-        Responder pirResponder = new Responder(query);
-        pirResponder.computeStandaloneResponse();
-        break;
-    }
-  }
-
-  // Exception and Security Manager classes used to catch System.exit from Spark Streaming
-  private static class SystemExitException extends SecurityException
-  {}
-
-  private static class SystemExitManager extends SecurityManager
-  {
-    @Override
-    public void checkPermission(Permission perm)
-    {}
-
-    @Override
-    public void checkExit(int status)
-    {
-      super.checkExit(status);
-      if (status == 0) // If we exited cleanly, throw SystemExitException
+      ResponderPlugin responder = ResponderService.getInstance().getResponder(platformName);
+      if (responder == null)
       {
-        throw new SystemExitException();
+        logger.error("No such platform plugin found: {}!", platformName);
       }
       else
       {
-        throw new SecurityException();
+        responder.run();
       }
-
+    }
+    catch (PIRException pirEx)
+    {
+      logger.error("Failed to load platform plugin: {}! {}", platformName, pirEx.getMessage());
+    }
+    catch (Exception ex)
+    {
+      logger.error("Failed to run platform plugin: {}! {}", platformName, ex);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index 7cba88e..a9eb80d 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -128,15 +128,7 @@ public class ResponderProps
 
     if (!SystemConfiguration.hasProperty(PLATFORM))
     {
-      logger.info("Must have the option " + PLATFORM);
-      valid = false;
-    }
-
-    String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase();
-    if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("storm")
-        && !platform.equals("standalone"))
-    {
-      logger.info("Unsupported platform: " + platform);
+      logger.info("Must have the option {}", PLATFORM);
       valid = false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
new file mode 100644
index 0000000..129b1c9
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
@@ -0,0 +1,73 @@
+package org.apache.pirk.responder.wideskies;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResponderService
+{
+  private static final Logger logger = LoggerFactory.getLogger(ResponderService.class);
+
+  // Singleton for the responder service.
+  private static ResponderService service;
+  private ServiceLoader<ResponderPlugin> loader;
+
+  public static synchronized ResponderService getInstance()
+  {
+    if (service == null)
+    {
+      service = new ResponderService();
+    }
+    return service;
+  }
+
+  private ResponderService()
+  {
+    loader = ServiceLoader.load(ResponderPlugin.class);
+  }
+
+  public ResponderPlugin getResponder(String platformName) throws PIRException
+  {
+    try
+    {
+      for(ResponderPlugin plugin : loader)
+      {
+        if (platformName.equalsIgnoreCase(plugin.getPlatformName()))
+        {
+          logger.debug("Found {}, in {}", platformName, plugin.getClass().getName());
+          return plugin;
+        }
+      }
+    }
+    catch (ServiceConfigurationError e)
+    {
+      logger.error("ResponderPlugin configuration error {}", e);
+      throw new PIRException(e);
+    }
+    return null;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
new file mode 100644
index 0000000..fc1d20b
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.mapreduce;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to launch Map Reduce responder
+ */
+public class MapReduceResponder implements ResponderPlugin
+{
+  private static final Logger logger = LoggerFactory.getLogger(MapReduceResponder.class);
+
+  @Override
+  public String getPlatformName() {
+    return "mapreduce";
+  }
+
+  @Override
+  public void run() throws Exception
+  {
+    logger.info("Launching MapReduce ResponderTool:");
+    ComputeResponseTool pirWLTool = new ComputeResponseTool();
+    ToolRunner.run(pirWLTool, new String[] {});
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
new file mode 100644
index 0000000..bd05236
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.spark;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to launch spark responder
+ */
+public class SparkResponder implements ResponderPlugin
+{
+  private static final Logger logger = LoggerFactory.getLogger(SparkResponder.class);
+
+  @Override
+  public String getPlatformName() {
+    return "spark";
+  }
+
+  @Override
+  public void run() throws Exception
+  {
+    logger.info("Launching Spark ComputeResponse:");
+    try
+    {
+      ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration()));
+      computeResponse.performQuery();
+    }
+    catch (IOException e)
+    {
+      logger.error("Unable to open filesystem: {}", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
new file mode 100644
index 0000000..295a3cf
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.spark.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.responder.wideskies.ResponderDriver;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.Permission;
+
+/**
+ * Class to launch stand alone responder
+ */
+public class SparkStreamingResponder implements ResponderPlugin
+{
+  private static final Logger logger = LoggerFactory.getLogger(SparkStreamingResponder.class);
+
+  @Override
+  public String getPlatformName() {
+    return "sparkstreaming";
+  }
+
+  @Override
+  public void run() throws Exception
+  {
+    // For handling System.exit calls from Spark Streaming
+    System.setSecurityManager(new SystemExitManager());
+    logger.info("Launching Spark ComputeStreamingResponse:");
+    ComputeStreamingResponse computeSR = null;
+    try
+    {
+      computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration()));
+      computeSR.performQuery();
+    }
+    catch (SystemExitException e)
+    {
+      // If System.exit(0) is not caught from Spark Streaming,
+      // the application will complete with a 'failed' status
+      logger.info("Exited with System.exit(0) from Spark Streaming");
+    }
+    finally
+    {
+      // Teardown the context
+      if (computeSR != null)
+        computeSR.teardown();
+    }
+  }
+
+  // Exception and Security Manager classes used to catch System.exit from Spark Streaming
+  private static class SystemExitException extends SecurityException
+  {}
+
+  private static class SystemExitManager extends SecurityManager
+  {
+    @Override
+    public void checkPermission(Permission perm)
+    {}
+
+    @Override
+    public void checkExit(int status)
+    {
+      super.checkExit(status);
+      if (status == 0) // If we exited cleanly, throw SystemExitException
+      {
+        throw new SystemExitException();
+      }
+      else
+      {
+        throw new SecurityException();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
new file mode 100644
index 0000000..3dade0d
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.spi;
+
+/**
+ * Interface which launches a responder
+ * <p>
+ * Implement this interface to start the execution of a framework responder, the run method will be called via reflection by the ResponderDriver.
+ * </p>
+ */
+public interface ResponderPlugin
+{
+  /**
+   * Returns the plugin name for your framework
+   * This will be the platform argument
+   * @return
+   */
+  public String getPlatformName();
+  /**
+   * This method launches your framework responder.
+   */
+  public void run() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
new file mode 100644
index 0000000..5214c5f
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.standalone;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Class to launch stand alone responder
+ */
+public class StandaloneResponder implements ResponderPlugin
+{
+  private static final Logger logger = LoggerFactory.getLogger(StandaloneResponder.class);
+
+  @Override
+  public String getPlatformName() {
+    return "standalone";
+  }
+
+  @Override
+  public void run()
+  {
+    logger.info("Launching Standalone Responder:");
+    String queryInput = SystemConfiguration.getProperty("pir.queryInput");
+    try
+    {
+      Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
+      Responder pirResponder = new Responder(query);
+      pirResponder.computeStandaloneResponse();
+    }
+    catch (IOException e)
+    {
+      logger.error("Error reading {}, {}", queryInput, e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
new file mode 100644
index 0000000..08400ac
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to launch Storm responder
+ */
+public class StormResponder implements ResponderPlugin
+{
+
+  private static final Logger logger = LoggerFactory.getLogger(StormResponder.class);
+  @Override
+  public String getPlatformName()
+  {
+    return "storm";
+  }
+
+  @Override
+  public void run() throws Exception
+  {
+    logger.info("Launching Storm PirkTopology:");
+    PirkTopology.runPirkTopology();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin
----------------------------------------------------------------------
diff --git a/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin b/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin
new file mode 100644
index 0000000..33aff36
--- /dev/null
+++ b/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin
@@ -0,0 +1,5 @@
+org.apache.pirk.responder.wideskies.mapreduce.MapReduceResponder
+org.apache.pirk.responder.wideskies.spark.SparkResponder
+org.apache.pirk.responder.wideskies.spark.streaming.SparkStreamingResponder
+org.apache.pirk.responder.wideskies.standalone.StandaloneResponder
+org.apache.pirk.responder.wideskies.storm.StormResponder
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/resources/responder.properties
----------------------------------------------------------------------
diff --git a/src/main/resources/responder.properties b/src/main/resources/responder.properties
index ac6cb35..bb38b1d 100644
--- a/src/main/resources/responder.properties
+++ b/src/main/resources/responder.properties
@@ -27,9 +27,15 @@ pir.dataInputFormat=
 #outputFile -- required -- Fully qualified name of output file in hdfs
 pir.outputFile=
 
-#platform -- required -- 'mapreduce', 'spark', 'sparkstreaming', or 'standalone'
+#One of the following two options is required - launcher prefered
+
+#launcher -- required -- full class name of a class implementing ResponderPlugin
+#ie. org.apache.pirk.responder.wideskies.standalone.StandaloneResponderPluginProcessing platform technology for the responder
+#launcher=
+
+#platform -- required -- 'mapreduce', 'spark', 'sparkstreaming', 'standalone', or 'storm'
 #Processing platform technology for the responder                
-platform= 
+platform=
 
 #queryInput -- required -- Fully qualified dir in hdfs of Query files
 pir.queryInput=