You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pirk.apache.org by te...@apache.org on 2016/09/23 10:13:40 UTC
incubator-pirk git commit: [PIRK-63]- Generalize ResponderDriver to
use a RespondLauncher class.
Repository: incubator-pirk
Updated Branches:
refs/heads/master bf003cb06 -> 43c772c45
[PIRK-63]- Generalize ResponderDriver to use a RespondLauncher class.
This closes apache/incubator-pirk#93
Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/43c772c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/43c772c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/43c772c4
Branch: refs/heads/master
Commit: 43c772c45860393cb24e672dd7efd78e28d0de6f
Parents: bf003cb
Author: Darin Johnson <da...@apache.org>
Authored: Fri Sep 23 10:50:06 2016 +0100
Committer: Tim Ellison <t....@gmail.com>
Committed: Fri Sep 23 10:50:06 2016 +0100
----------------------------------------------------------------------
.../pirk/responder/wideskies/ResponderCLI.java | 6 +-
.../responder/wideskies/ResponderDriver.java | 116 +++----------------
.../responder/wideskies/ResponderProps.java | 10 +-
.../responder/wideskies/ResponderService.java | 73 ++++++++++++
.../wideskies/mapreduce/MapReduceResponder.java | 45 +++++++
.../wideskies/spark/SparkResponder.java | 55 +++++++++
.../streaming/SparkStreamingResponder.java | 92 +++++++++++++++
.../wideskies/spi/ResponderPlugin.java | 40 +++++++
.../standalone/StandaloneResponder.java | 58 ++++++++++
.../wideskies/storm/StormResponder.java | 45 +++++++
...pirk.responder.wideskies.spi.ResponderPlugin | 5 +
src/main/resources/responder.properties | 10 +-
12 files changed, 442 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
index 5ba8170..514491a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
@@ -111,7 +111,7 @@ public class ResponderCLI
* Method to parse and validate the options provided
*
* @return - true if valid, false otherwise
- * @throws IOException
+ * @throws IOException
*/
private boolean parseOptions() throws IOException
{
@@ -170,14 +170,14 @@ public class ResponderCLI
optionLocalPropFile.setArgName(LOCALPROPFILE);
optionLocalPropFile.setType(String.class);
options.addOption(optionLocalPropFile);
-
+
// hdfsPropertiesDir
Option optionHDFSPropDir = new Option("hdfsPropsDir", HDFSPROPDIR, true, "Optional location of directory in hdfs containing properties file(s)");
optionHDFSPropDir.setRequired(false);
optionHDFSPropDir.setArgName(HDFSPROPDIR);
optionHDFSPropDir.setType(String.class);
options.addOption(optionHDFSPropDir);
-
+
// hdfsPropertiesFile
Option optionHDFSPropFile = new Option("hdfsPropsFile", HDFSPROPFILE, true, "Optional location of properties file(s) in hdfs");
optionHDFSPropFile.setRequired(false);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index 044012d..02dbf2e 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -18,18 +18,8 @@
*/
package org.apache.pirk.responder.wideskies;
-import java.security.Permission;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.pirk.query.wideskies.Query;
-import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool;
-import org.apache.pirk.responder.wideskies.spark.ComputeResponse;
-import org.apache.pirk.responder.wideskies.spark.streaming.ComputeStreamingResponse;
-import org.apache.pirk.responder.wideskies.standalone.Responder;
-import org.apache.pirk.responder.wideskies.storm.PirkTopology;
-import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,103 +40,31 @@ public class ResponderDriver
{
private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class);
- private enum Platform
- {
- MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE
- }
-
- public static void main(String[] args) throws Exception
+ public static void main(String[] args)
{
ResponderCLI responderCLI = new ResponderCLI(args);
- // For handling System.exit calls from Spark Streaming
- System.setSecurityManager(new SystemExitManager());
-
- Platform platform = Platform.NONE;
- String platformString = SystemConfiguration.getProperty(ResponderProps.PLATFORM);
+ String platformName = SystemConfiguration.getProperty(ResponderProps.PLATFORM, "None");
+ logger.info("Attempting to use platform {} ...", platformName);
try
{
- platform = Platform.valueOf(platformString.toUpperCase());
- } catch (IllegalArgumentException e)
- {
- logger.error("platform " + platformString + " not found.");
- }
-
- logger.info("platform = " + platform);
- switch (platform)
- {
- case MAPREDUCE:
- logger.info("Launching MapReduce ResponderTool:");
-
- ComputeResponseTool pirWLTool = new ComputeResponseTool();
- ToolRunner.run(pirWLTool, new String[] {});
- break;
-
- case SPARK:
- logger.info("Launching Spark ComputeResponse:");
-
- ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration()));
- computeResponse.performQuery();
- break;
-
- case SPARKSTREAMING:
- logger.info("Launching Spark ComputeStreamingResponse:");
-
- ComputeStreamingResponse computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration()));
- try
- {
- computeSR.performQuery();
- } catch (SystemExitException e)
- {
- // If System.exit(0) is not caught from Spark Streaming,
- // the application will complete with a 'failed' status
- logger.info("Exited with System.exit(0) from Spark Streaming");
- }
-
- // Teardown the context
- computeSR.teardown();
- break;
-
- case STORM:
- logger.info("Launching Storm PirkTopology:");
- PirkTopology.runPirkTopology();
- break;
-
- case STANDALONE:
- logger.info("Launching Standalone Responder:");
-
- String queryInput = SystemConfiguration.getProperty("pir.queryInput");
- Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
-
- Responder pirResponder = new Responder(query);
- pirResponder.computeStandaloneResponse();
- break;
- }
- }
-
- // Exception and Security Manager classes used to catch System.exit from Spark Streaming
- private static class SystemExitException extends SecurityException
- {}
-
- private static class SystemExitManager extends SecurityManager
- {
- @Override
- public void checkPermission(Permission perm)
- {}
-
- @Override
- public void checkExit(int status)
- {
- super.checkExit(status);
- if (status == 0) // If we exited cleanly, throw SystemExitException
+ ResponderPlugin responder = ResponderService.getInstance().getResponder(platformName);
+ if (responder == null)
{
- throw new SystemExitException();
+ logger.error("No such platform plugin found: {}!", platformName);
}
else
{
- throw new SecurityException();
+ responder.run();
}
-
+ }
+ catch (PIRException pirEx)
+ {
+ logger.error("Failed to load platform plugin: {}! {}", platformName, pirEx.getMessage());
+ }
+ catch (Exception ex)
+ {
+ logger.error("Failed to run platform plugin: {}! {}", platformName, ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index 7cba88e..a9eb80d 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -128,15 +128,7 @@ public class ResponderProps
if (!SystemConfiguration.hasProperty(PLATFORM))
{
- logger.info("Must have the option " + PLATFORM);
- valid = false;
- }
-
- String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase();
- if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("storm")
- && !platform.equals("standalone"))
- {
- logger.info("Unsupported platform: " + platform);
+ logger.info("Must have the option {}", PLATFORM);
valid = false;
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
new file mode 100644
index 0000000..129b1c9
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java
@@ -0,0 +1,73 @@
+package org.apache.pirk.responder.wideskies;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResponderService
+{
+ private static final Logger logger = LoggerFactory.getLogger(ResponderService.class);
+
+ // Singleton for the responder service.
+ private static ResponderService service;
+ private ServiceLoader<ResponderPlugin> loader;
+
+ public static synchronized ResponderService getInstance()
+ {
+ if (service == null)
+ {
+ service = new ResponderService();
+ }
+ return service;
+ }
+
+ private ResponderService()
+ {
+ loader = ServiceLoader.load(ResponderPlugin.class);
+ }
+
+ public ResponderPlugin getResponder(String platformName) throws PIRException
+ {
+ try
+ {
+ for(ResponderPlugin plugin : loader)
+ {
+ if (platformName.equalsIgnoreCase(plugin.getPlatformName()))
+ {
+ logger.debug("Found {}, in {}", platformName, plugin.getClass().getName());
+ return plugin;
+ }
+ }
+ }
+ catch (ServiceConfigurationError e)
+ {
+ logger.error("ResponderPlugin configuration error {}", e);
+ throw new PIRException(e);
+ }
+ return null;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
new file mode 100644
index 0000000..fc1d20b
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.mapreduce;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to launch Map Reduce responder
+ */
+public class MapReduceResponder implements ResponderPlugin
+{
+ private static final Logger logger = LoggerFactory.getLogger(MapReduceResponder.class);
+
+ @Override
+ public String getPlatformName() {
+ return "mapreduce";
+ }
+
+ @Override
+ public void run() throws Exception
+ {
+ logger.info("Launching MapReduce ResponderTool:");
+ ComputeResponseTool pirWLTool = new ComputeResponseTool();
+ ToolRunner.run(pirWLTool, new String[] {});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
new file mode 100644
index 0000000..bd05236
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.spark;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to launch spark responder
+ */
+public class SparkResponder implements ResponderPlugin
+{
+ private static final Logger logger = LoggerFactory.getLogger(SparkResponder.class);
+
+ @Override
+ public String getPlatformName() {
+ return "spark";
+ }
+
+ @Override
+ public void run() throws Exception
+ {
+ logger.info("Launching Spark ComputeResponse:");
+ try
+ {
+ ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration()));
+ computeResponse.performQuery();
+ }
+ catch (IOException e)
+ {
+ logger.error("Unable to open filesystem: {}", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
new file mode 100644
index 0000000..295a3cf
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.spark.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.responder.wideskies.ResponderDriver;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.Permission;
+
+/**
+ * Class to launch stand alone responder
+ */
+public class SparkStreamingResponder implements ResponderPlugin
+{
+ private static final Logger logger = LoggerFactory.getLogger(SparkStreamingResponder.class);
+
+ @Override
+ public String getPlatformName() {
+ return "sparkstreaming";
+ }
+
+ @Override
+ public void run() throws Exception
+ {
+ // For handling System.exit calls from Spark Streaming
+ System.setSecurityManager(new SystemExitManager());
+ logger.info("Launching Spark ComputeStreamingResponse:");
+ ComputeStreamingResponse computeSR = null;
+ try
+ {
+ computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration()));
+ computeSR.performQuery();
+ }
+ catch (SystemExitException e)
+ {
+ // If System.exit(0) is not caught from Spark Streaming,
+ // the application will complete with a 'failed' status
+ logger.info("Exited with System.exit(0) from Spark Streaming");
+ }
+ finally
+ {
+ // Teardown the context
+ if (computeSR != null)
+ computeSR.teardown();
+ }
+ }
+
+ // Exception and Security Manager classes used to catch System.exit from Spark Streaming
+ private static class SystemExitException extends SecurityException
+ {}
+
+ private static class SystemExitManager extends SecurityManager
+ {
+ @Override
+ public void checkPermission(Permission perm)
+ {}
+
+ @Override
+ public void checkExit(int status)
+ {
+ super.checkExit(status);
+ if (status == 0) // If we exited cleanly, throw SystemExitException
+ {
+ throw new SystemExitException();
+ }
+ else
+ {
+ throw new SecurityException();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
new file mode 100644
index 0000000..3dade0d
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.spi;
+
+/**
+ * Interface which launches a responder
+ * <p>
+ * Implement this interface to start the execution of a framework responder, the run method will be called via reflection by the ResponderDriver.
+ * </p>
+ */
+public interface ResponderPlugin
+{
+ /**
+ * Returns the plugin name for your framework
+ * This will be the platform argument
+ * @return
+ */
+ public String getPlatformName();
+ /**
+ * This method launches your framework responder.
+ */
+ public void run() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
new file mode 100644
index 0000000..5214c5f
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.standalone;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Class to launch stand alone responder
+ */
+public class StandaloneResponder implements ResponderPlugin
+{
+ private static final Logger logger = LoggerFactory.getLogger(StandaloneResponder.class);
+
+ @Override
+ public String getPlatformName() {
+ return "standalone";
+ }
+
+ @Override
+ public void run()
+ {
+ logger.info("Launching Standalone Responder:");
+ String queryInput = SystemConfiguration.getProperty("pir.queryInput");
+ try
+ {
+ Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
+ Responder pirResponder = new Responder(query);
+ pirResponder.computeStandaloneResponse();
+ }
+ catch (IOException e)
+ {
+ logger.error("Error reading {}, {}", queryInput, e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
new file mode 100644
index 0000000..08400ac
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to launch Storm responder
+ */
+public class StormResponder implements ResponderPlugin
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(StormResponder.class);
+ @Override
+ public String getPlatformName()
+ {
+ return "storm";
+ }
+
+ @Override
+ public void run() throws Exception
+ {
+ logger.info("Launching Storm PirkTopology:");
+ PirkTopology.runPirkTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin
----------------------------------------------------------------------
diff --git a/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin b/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin
new file mode 100644
index 0000000..33aff36
--- /dev/null
+++ b/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin
@@ -0,0 +1,5 @@
+org.apache.pirk.responder.wideskies.mapreduce.MapReduceResponder
+org.apache.pirk.responder.wideskies.spark.SparkResponder
+org.apache.pirk.responder.wideskies.spark.streaming.SparkStreamingResponder
+org.apache.pirk.responder.wideskies.standalone.StandaloneResponder
+org.apache.pirk.responder.wideskies.storm.StormResponder
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/resources/responder.properties
----------------------------------------------------------------------
diff --git a/src/main/resources/responder.properties b/src/main/resources/responder.properties
index ac6cb35..bb38b1d 100644
--- a/src/main/resources/responder.properties
+++ b/src/main/resources/responder.properties
@@ -27,9 +27,15 @@ pir.dataInputFormat=
#outputFile -- required -- Fully qualified name of output file in hdfs
pir.outputFile=
-#platform -- required -- 'mapreduce', 'spark', 'sparkstreaming', or 'standalone'
+#One of the following two options is required - launcher prefered
+
+#launcher -- required -- full class name of a class implementing ResponderPlugin
+#ie. org.apache.pirk.responder.wideskies.standalone.StandaloneResponderPluginProcessing platform technology for the responder
+#launcher=
+
+#platform -- required -- 'mapreduce', 'spark', 'sparkstreaming', 'standalone', or 'storm'
#Processing platform technology for the responder
-platform=
+platform=
#queryInput -- required -- Fully qualified dir in hdfs of Query files
pir.queryInput=