You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pirk.apache.org by ea...@apache.org on 2016/09/23 22:39:39 UTC
incubator-pirk git commit: Narrow ResponderPlugin run method to throw
PIRException -- closes apache/incubator-pirk#99
Repository: incubator-pirk
Updated Branches:
refs/heads/master 43c772c45 -> 98b1b4c36
Narrow ResponderPlugin run method to throw PIRException -- closes apache/incubator-pirk#99
Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/98b1b4c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/98b1b4c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/98b1b4c3
Branch: refs/heads/master
Commit: 98b1b4c36d5ff10ec86f7e24225c5c42ac6004fa
Parents: 43c772c
Author: tellison <te...@apache.org>
Authored: Fri Sep 23 18:39:31 2016 -0400
Committer: eawilliams <ea...@apache.org>
Committed: Fri Sep 23 18:39:31 2016 -0400
----------------------------------------------------------------------
.../mapreduce/ComputeResponseTool.java | 3 +-
.../wideskies/mapreduce/MapReduceResponder.java | 14 +++-
.../wideskies/spark/ComputeResponse.java | 27 ++++---
.../wideskies/spark/SparkResponder.java | 3 +-
.../streaming/ComputeStreamingResponse.java | 52 ++++++++-----
.../streaming/SparkStreamingResponder.java | 28 +++++--
.../wideskies/spi/ResponderPlugin.java | 4 +-
.../responder/wideskies/storm/PirkTopology.java | 19 ++++-
.../wideskies/storm/StormResponder.java | 3 +-
.../pirk/schema/data/DataSchemaLoader.java | 81 ++++++++++----------
.../pirk/schema/query/QuerySchemaLoader.java | 81 ++++++++++----------
11 files changed, 189 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
index 80e8a13..21cf518 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
@@ -54,6 +54,7 @@ import org.apache.pirk.schema.query.QuerySchemaRegistry;
import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.utils.FileConst;
import org.apache.pirk.utils.HDFS;
+import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.slf4j.Logger;
@@ -117,7 +118,7 @@ public class ComputeResponseTool extends Configured implements Tool
private QueryInfo queryInfo = null;
private QuerySchema qSchema = null;
- public ComputeResponseTool() throws Exception
+ public ComputeResponseTool() throws IOException, PIRException
{
setupParameters();
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
index fc1d20b..fcbc88b 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
@@ -20,6 +20,7 @@ package org.apache.pirk.responder.wideskies.mapreduce;
import org.apache.hadoop.util.ToolRunner;
import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,10 +37,17 @@ public class MapReduceResponder implements ResponderPlugin
}
@Override
- public void run() throws Exception
+ public void run() throws PIRException
{
logger.info("Launching MapReduce ResponderTool:");
- ComputeResponseTool pirWLTool = new ComputeResponseTool();
- ToolRunner.run(pirWLTool, new String[] {});
+ try
+ {
+ ComputeResponseTool pirWLTool = new ComputeResponseTool();
+ ToolRunner.run(pirWLTool, new String[] {});
+ } catch (Exception e)
+ {
+ // An exception occurred invoking the tool, don't know how to recover.
+ throw new PIRException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index d6593f7..00bc5c1 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -99,7 +99,7 @@ public class ComputeResponse
private boolean colMultReduceByKey = false;
- public ComputeResponse(FileSystem fileSys) throws Exception
+ public ComputeResponse(FileSystem fileSys) throws PIRException
{
fs = fileSys;
storage = new HadoopFileSystemStore(fs);
@@ -156,12 +156,18 @@ public class ComputeResponse
// Setup, run query, teardown
logger.info("Setting up for query run");
- setup();
+ try
+ {
+ setup();
+ } catch (IOException e)
+ {
+ throw new PIRException("An error occurred setting up the Spark responder.", e);
+ }
logger.info("Setup complete");
}
// Setup for the accumulators and broadcast variables
- private void setup() throws Exception
+ private void setup() throws IOException, PIRException
{
// Load the schemas
DataSchemaLoader.initialize(true, fs);
@@ -219,7 +225,7 @@ public class ComputeResponse
/**
* Method to read in data from an allowed input source/format and perform the query
*/
- public void performQuery() throws Exception
+ public void performQuery() throws IOException, PIRException
{
logger.info("Performing query: ");
@@ -243,7 +249,7 @@ public class ComputeResponse
* Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements
*/
@SuppressWarnings("unchecked")
- public JavaRDD<MapWritable> readData() throws Exception
+ public JavaRDD<MapWritable> readData() throws IOException, PIRException
{
logger.info("Reading data ");
@@ -268,10 +274,13 @@ public class ComputeResponse
// Set the inputFormatClass based upon the baseInputFormat property
String classString = SystemConfiguration.getProperty("pir.baseInputFormat");
- Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString);
- if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass))
+ Class<? extends BaseInputFormat<Text,MapWritable>> inputClass;
+ try
+ {
+ inputClass = (Class<? extends BaseInputFormat<Text,MapWritable>>) Class.forName(classString);
+ } catch (ClassNotFoundException | ClassCastException e)
{
- throw new Exception("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
+ throw new PIRException(classString + " cannot be instantiated or does not extend BaseInputFormat", e);
}
job.setInputFormatClass(inputClass);
@@ -296,7 +305,7 @@ public class ComputeResponse
* Method to read in the data from elasticsearch, filter, and return a RDD of MapWritable data elements
*/
@SuppressWarnings("unchecked")
- public JavaRDD<MapWritable> readDataES() throws Exception
+ public JavaRDD<MapWritable> readDataES() throws IOException, PIRException
{
logger.info("Reading data ");
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
index bd05236..fce905d 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ public class SparkResponder implements ResponderPlugin
}
@Override
- public void run() throws Exception
+ public void run() throws PIRException
{
logger.info("Launching Spark ComputeResponse:");
try
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
index b91cc68..c291df0 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
@@ -43,10 +43,12 @@ import org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData;
import org.apache.pirk.schema.data.DataSchema;
import org.apache.pirk.schema.data.DataSchemaLoader;
import org.apache.pirk.schema.data.DataSchemaRegistry;
+import org.apache.pirk.schema.data.partitioner.DataPartitioner;
import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.schema.query.QuerySchemaLoader;
import org.apache.pirk.schema.query.QuerySchemaRegistry;
import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@@ -106,7 +108,7 @@ public class ComputeStreamingResponse
private boolean colMultReduceByKey = false;
- public ComputeStreamingResponse(FileSystem fileSys) throws Exception
+ public ComputeStreamingResponse(FileSystem fileSys) throws PIRException
{
fs = fileSys;
storage = new HadoopFileSystemStore(fs);
@@ -171,12 +173,18 @@ public class ComputeStreamingResponse
// Setup, run query, teardown
logger.info("Setting up for query run");
- setup();
+ try
+ {
+ setup();
+ } catch (IOException e)
+ {
+ throw new PIRException("An error occurred setting up the streaming responder.", e);
+ }
logger.info("Setup complete");
}
// Setup for the accumulators and broadcast variables
- private void setup() throws Exception
+ private void setup() throws IOException, PIRException
{
// Load the schemas
DataSchemaLoader.initialize(true, fs);
@@ -191,11 +199,6 @@ public class ComputeStreamingResponse
QueryInfo queryInfo = query.getQueryInfo();
bVars.setQuery(query);
bVars.setQueryInfo(queryInfo);
-
- if(query == null)
- {
- logger.info("query is null for queryInput = " + queryInput);
- }
if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false))
{
@@ -235,15 +238,20 @@ public class ComputeStreamingResponse
/**
* Method to start the computation
- *
- * @throws InterruptedException
*/
- public void start() throws InterruptedException
+ public void start()
{
logger.info("Starting computation...");
jssc.start();
- jssc.awaitTermination();
+ try
+ {
+ jssc.awaitTermination();
+ } catch (InterruptedException e)
+ {
+ // Interrupted while waiting for termination
+ Thread.interrupted();
+ }
}
/**
@@ -259,7 +267,7 @@ public class ComputeStreamingResponse
/**
* Method to read in data from an allowed input source/format and perform the query
*/
- public void performQuery() throws IOException, ClassNotFoundException, InterruptedException
+ public void performQuery() throws IOException, PIRException
{
logger.info("Performing query: ");
@@ -272,6 +280,9 @@ public class ComputeStreamingResponse
{
inputRDD = readDataES();
}
+ else {
+ throw new PIRException("Unknown data input format " + dataInputFormat);
+ }
performQuery(inputRDD);
}
@@ -280,7 +291,7 @@ public class ComputeStreamingResponse
* Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements
*/
@SuppressWarnings("unchecked")
- public JavaDStream<MapWritable> readData() throws ClassNotFoundException, IOException
+ public JavaDStream<MapWritable> readData() throws IOException, PIRException
{
logger.info("Reading data ");
@@ -296,10 +307,13 @@ public class ComputeStreamingResponse
// Set the inputFormatClass based upon the baseInputFormat property
String classString = SystemConfiguration.getProperty("pir.baseInputFormat");
- Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString);
- if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass))
+ Class<? extends BaseInputFormat<Text,MapWritable>> inputClass;
+ try
+ {
+ inputClass = (Class<? extends BaseInputFormat<Text,MapWritable>>) Class.forName(classString);
+ } catch (ClassNotFoundException | ClassCastException e)
{
- throw new ClassCastException("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
+ throw new PIRException(classString + " cannot be instantiated or does not extend BaseInputFormat", e);
}
job.setInputFormatClass(inputClass);
@@ -397,10 +411,8 @@ public class ComputeStreamingResponse
/**
* Method to perform the query given an input JavaDStream of JSON
*
- * @throws InterruptedException
- *
*/
- public void performQuery(JavaDStream<MapWritable> input) throws InterruptedException
+ public void performQuery(JavaDStream<MapWritable> input)
{
logger.info("Performing query: ");
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
index 295a3cf..4ce0571 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
@@ -18,15 +18,16 @@
*/
package org.apache.pirk.responder.wideskies.spark.streaming;
+import java.io.IOException;
+import java.security.Permission;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.pirk.responder.wideskies.ResponderDriver;
import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.security.Permission;
-
/**
* Class to launch stand alone responder
*/
@@ -40,15 +41,25 @@ public class SparkStreamingResponder implements ResponderPlugin
}
@Override
- public void run() throws Exception
+ public void run() throws PIRException
{
// For handling System.exit calls from Spark Streaming
System.setSecurityManager(new SystemExitManager());
+
+ FileSystem fileSys;
+ try
+ {
+ fileSys = FileSystem.get(new Configuration());
+ } catch (IOException e)
+ {
+ throw new PIRException(e);
+ }
+
logger.info("Launching Spark ComputeStreamingResponse:");
ComputeStreamingResponse computeSR = null;
try
{
- computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration()));
+ computeSR = new ComputeStreamingResponse(fileSys);
computeSR.performQuery();
}
catch (SystemExitException e)
@@ -56,6 +67,9 @@ public class SparkStreamingResponder implements ResponderPlugin
// If System.exit(0) is not caught from Spark Streaming,
// the application will complete with a 'failed' status
logger.info("Exited with System.exit(0) from Spark Streaming");
+ } catch (IOException e)
+ {
+ throw new PIRException(e);
}
finally
{
@@ -67,7 +81,9 @@ public class SparkStreamingResponder implements ResponderPlugin
// Exception and Security Manager classes used to catch System.exit from Spark Streaming
private static class SystemExitException extends SecurityException
- {}
+ {
+ private static final long serialVersionUID = 1L;
+ }
private static class SystemExitManager extends SecurityManager
{
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
index 3dade0d..912850a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
@@ -19,6 +19,8 @@
package org.apache.pirk.responder.wideskies.spi;
+import org.apache.pirk.utils.PIRException;
+
/**
* Interface which launches a responder
* <p>
@@ -36,5 +38,5 @@ public interface ResponderPlugin
/**
* This method launches your framework responder.
*/
- public void run() throws Exception;
+ public void run() throws PIRException;
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
index e0f83d3..fd23c7e 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
@@ -19,11 +19,18 @@
package org.apache.pirk.responder.wideskies.storm;
import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.*;
+import org.apache.storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.KafkaSpout;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
@@ -61,7 +68,7 @@ public class PirkTopology
private static final String queryFile = SystemConfiguration.getProperty("pir.queryInput");
private static final String outputPath = SystemConfiguration.getProperty("pir.outputFile");
- public static void runPirkTopology() throws Exception
+ public static void runPirkTopology() throws PIRException
{
// Set up Kafka parameters
logger.info("Configuring Kafka.");
@@ -85,7 +92,13 @@ public class PirkTopology
// Run topology
logger.info("Submitting Pirk topology to Storm...");
- StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+ try
+ {
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+ } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e)
+ {
+ throw new PIRException(e);
+ }
} // main
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
index 08400ac..b988ccc 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
@@ -20,6 +20,7 @@
package org.apache.pirk.responder.wideskies.storm;
import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +38,7 @@ public class StormResponder implements ResponderPlugin
}
@Override
- public void run() throws Exception
+ public void run() throws PIRException
{
logger.info("Launching Storm PirkTopology:");
PirkTopology.runPirkTopology();
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index 4bac8b7..d1a4797 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -19,7 +19,6 @@
package org.apache.pirk.schema.data;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
@@ -79,14 +78,12 @@ public class DataSchemaLoader
static
{
logger.info("Loading pre-configured data schemas: ");
-
try
{
initialize();
- } catch (Exception e)
+ } catch (PIRException e)
{
- logger.error("Caught exception: ");
- e.printStackTrace();
+ logger.error(e.getLocalizedMessage());
}
}
@@ -94,10 +91,10 @@ public class DataSchemaLoader
/**
* Initializes the static {@link DataSchemaRegistry} with a list of available data schema names.
*
- * @throws Exception
+ * @throws PIRException
* - failed to initialize
*/
- public static void initialize() throws Exception
+ public static void initialize() throws PIRException
{
initialize(false, null);
}
@@ -110,10 +107,10 @@ public class DataSchemaLoader
* If true, specifies that the data schema is an hdfs file; if false, that it is a regular file.
* @param fs
* Used only when {@code hdfs} is true; the {@link FileSystem} handle for the hdfs in which the data schema exists
- * @throws Exception
- * - failed to initialize
+ * @throws PIRException
+ * - failed to initialize the data schemas because they could not be read or are invalid.
*/
- public static void initialize(boolean hdfs, FileSystem fs) throws Exception
+ public static void initialize(boolean hdfs, FileSystem fs) throws PIRException
{
String dataSchemas = SystemConfiguration.getProperty("data.schemas", "none");
if (dataSchemas.equals("none"))
@@ -122,41 +119,43 @@ public class DataSchemaLoader
}
String[] dataSchemaFiles = dataSchemas.split(",");
- for (String schemaFile : dataSchemaFiles)
+ try
{
- logger.info("Loading schemaFile = " + schemaFile + " hdfs = " + hdfs);
-
- // Parse and load the schema file into a DataSchema object; place in the schemaMap
- DataSchemaLoader loader = new DataSchemaLoader();
- InputStream is = null;
- if (hdfs)
+ for (String schemaFile : dataSchemaFiles)
{
- logger.info("hdfs: filePath = " + schemaFile);
- is = fs.open(fs.makeQualified(new Path(schemaFile)));
- }
- else
- {
- try
- {
- is = new FileInputStream(schemaFile);
- logger.info("localFS: inputFile = " + schemaFile);
- } catch (FileNotFoundException e)
- {
- logger.info("localFS: inputFile = " + schemaFile + " not found");
- }
+ DataSchema dataSchema = readSchemaFile(schemaFile, fs, hdfs);
+ DataSchemaRegistry.put(dataSchema);
}
+ } catch (IOException e)
+ {
+ throw new PIRException("Error reading data schema", e);
+ }
+ }
- if (is != null)
- {
- try
- {
- DataSchema dataSchema = loader.loadSchema(is);
- DataSchemaRegistry.put(dataSchema);
- } finally
- {
- is.close();
- }
- }
+ private static DataSchema readSchemaFile(String schemaFile, FileSystem fs, boolean hdfs) throws IOException, PIRException
+ {
+ logger.info("Loading data schemaFile = " + schemaFile + " hdfs = " + hdfs);
+
+ // Parse and load the schema file into a DataSchema object; place in the schemaMap
+ DataSchemaLoader loader = new DataSchemaLoader();
+ InputStream is;
+ if (hdfs)
+ {
+ logger.info("hdfs: filePath = " + schemaFile);
+ is = fs.open(fs.makeQualified(new Path(schemaFile)));
+ }
+ else
+ {
+ logger.info("localFS: inputFile = " + schemaFile);
+ is = new FileInputStream(schemaFile);
+ }
+
+ try
+ {
+ return loader.loadSchema(is);
+ } finally
+ {
+ is.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
index 433a303..c1b4139 100644
--- a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
@@ -19,7 +19,6 @@
package org.apache.pirk.schema.query;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
@@ -85,10 +84,9 @@ public class QuerySchemaLoader
try
{
initialize();
- } catch (Exception e)
+ } catch (PIRException e)
{
- logger.error("Caught exception: ");
- e.printStackTrace();
+ logger.error(e.getLocalizedMessage());
}
}
@@ -96,10 +94,10 @@ public class QuerySchemaLoader
/**
* Initializes the static {@link QuerySchemaRegistry} with a list of query schema names.
*
- * @throws Exception
+ * @throws PIRException
* - failed to initialize
*/
- public static void initialize() throws Exception
+ public static void initialize() throws PIRException
{
initialize(false, null);
}
@@ -112,52 +110,55 @@ public class QuerySchemaLoader
* If true, specifies that the query schema is an hdfs file; if false, that it is a regular file.
* @param fs
* Used only when {@code hdfs} is true; the {@link FileSystem} handle for the hdfs in which the query schema exists
- * @throws Exception
- * - failed to initialize
+ * @throws PIRException
+ * - failed to initialize the query schemas because they could not be read or are invalid.
*/
- public static void initialize(boolean hdfs, FileSystem fs) throws Exception
+ public static void initialize(boolean hdfs, FileSystem fs) throws PIRException
{
String querySchemas = SystemConfiguration.getProperty("query.schemas", "none");
if (querySchemas.equals("none"))
{
return;
}
+
String[] querySchemaFiles = querySchemas.split(",");
- for (String schemaFile : querySchemaFiles)
+ try
{
- logger.info("Loading schemaFile = " + schemaFile);
-
- // Parse and load the schema file into a QuerySchema object; place in the schemaMap
- QuerySchemaLoader loader = new QuerySchemaLoader();
- InputStream is = null;
- if (hdfs)
+ for (String schemaFile : querySchemaFiles)
{
- is = fs.open(new Path(schemaFile));
- logger.info("hdfs: filePath = " + schemaFile);
- }
- else
- {
- try
- {
- is = new FileInputStream(schemaFile);
- logger.info("localFS: inputFile = " + schemaFile);
- } catch (FileNotFoundException e)
- {
- logger.info("localFS: inputFile = " + schemaFile + " not found");
- }
+ QuerySchema querySchema = readSchemaFile(schemaFile, fs, hdfs);
+ QuerySchemaRegistry.put(querySchema);
}
+ } catch (IOException e)
+ {
+ throw new PIRException("Error reading query schema", e);
+ }
+ }
- if (is != null)
- {
- try
- {
- QuerySchema querySchema = loader.loadSchema(is);
- QuerySchemaRegistry.put(querySchema);
- } finally
- {
- is.close();
- }
- }
+ private static QuerySchema readSchemaFile(String schemaFile, FileSystem fs, boolean hdfs) throws IOException, PIRException
+ {
+ logger.info("Loading query schemaFile = " + schemaFile);
+
+ // Parse and load the schema file into a QuerySchema object.
+ QuerySchemaLoader loader = new QuerySchemaLoader();
+ InputStream is;
+ if (hdfs)
+ {
+ logger.info("hdfs: filePath = " + schemaFile);
+ is = fs.open(new Path(schemaFile));
+ }
+ else
+ {
+ logger.info("localFS: inputFile = " + schemaFile);
+ is = new FileInputStream(schemaFile);
+ }
+
+ try
+ {
+ return loader.loadSchema(is);
+ } finally
+ {
+ is.close();
}
}