You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pirk.apache.org by ea...@apache.org on 2016/09/23 22:39:39 UTC

incubator-pirk git commit: Narrow ResponderPlugin run method to throw PIRException -- closes apache/incubator-pirk#99

Repository: incubator-pirk
Updated Branches:
  refs/heads/master 43c772c45 -> 98b1b4c36


Narrow ResponderPlugin run method to throw PIRException -- closes apache/incubator-pirk#99


Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/98b1b4c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/98b1b4c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/98b1b4c3

Branch: refs/heads/master
Commit: 98b1b4c36d5ff10ec86f7e24225c5c42ac6004fa
Parents: 43c772c
Author: tellison <te...@apache.org>
Authored: Fri Sep 23 18:39:31 2016 -0400
Committer: eawilliams <ea...@apache.org>
Committed: Fri Sep 23 18:39:31 2016 -0400

----------------------------------------------------------------------
 .../mapreduce/ComputeResponseTool.java          |  3 +-
 .../wideskies/mapreduce/MapReduceResponder.java | 14 +++-
 .../wideskies/spark/ComputeResponse.java        | 27 ++++---
 .../wideskies/spark/SparkResponder.java         |  3 +-
 .../streaming/ComputeStreamingResponse.java     | 52 ++++++++-----
 .../streaming/SparkStreamingResponder.java      | 28 +++++--
 .../wideskies/spi/ResponderPlugin.java          |  4 +-
 .../responder/wideskies/storm/PirkTopology.java | 19 ++++-
 .../wideskies/storm/StormResponder.java         |  3 +-
 .../pirk/schema/data/DataSchemaLoader.java      | 81 ++++++++++----------
 .../pirk/schema/query/QuerySchemaLoader.java    | 81 ++++++++++----------
 11 files changed, 189 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
index 80e8a13..21cf518 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
@@ -54,6 +54,7 @@ import org.apache.pirk.schema.query.QuerySchemaRegistry;
 import org.apache.pirk.serialization.HadoopFileSystemStore;
 import org.apache.pirk.utils.FileConst;
 import org.apache.pirk.utils.HDFS;
+import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.elasticsearch.hadoop.mr.EsInputFormat;
 import org.slf4j.Logger;
@@ -117,7 +118,7 @@ public class ComputeResponseTool extends Configured implements Tool
   private QueryInfo queryInfo = null;
   private QuerySchema qSchema = null;
 
-  public ComputeResponseTool() throws Exception
+  public ComputeResponseTool() throws IOException, PIRException
   {
     setupParameters();
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
index fc1d20b..fcbc88b 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java
@@ -20,6 +20,7 @@ package org.apache.pirk.responder.wideskies.mapreduce;
 
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,10 +37,17 @@ public class MapReduceResponder implements ResponderPlugin
   }
 
   @Override
-  public void run() throws Exception
+  public void run() throws PIRException
   {
     logger.info("Launching MapReduce ResponderTool:");
-    ComputeResponseTool pirWLTool = new ComputeResponseTool();
-    ToolRunner.run(pirWLTool, new String[] {});
+    try
+    {
+      ComputeResponseTool pirWLTool = new ComputeResponseTool();
+      ToolRunner.run(pirWLTool, new String[] {});
+    } catch (Exception e)
+    {
+      // An exception occurred invoking the tool, don't know how to recover.
+      throw new PIRException(e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index d6593f7..00bc5c1 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -99,7 +99,7 @@ public class ComputeResponse
 
   private boolean colMultReduceByKey = false;
 
-  public ComputeResponse(FileSystem fileSys) throws Exception
+  public ComputeResponse(FileSystem fileSys) throws PIRException
   {
     fs = fileSys;
     storage = new HadoopFileSystemStore(fs);
@@ -156,12 +156,18 @@ public class ComputeResponse
 
     // Setup, run query, teardown
     logger.info("Setting up for query run");
-    setup();
+    try
+    {
+      setup();
+    } catch (IOException e)
+    {
+      throw new PIRException("An error occurred setting up the Spark responder.", e);
+    }
     logger.info("Setup complete");
   }
 
   // Setup for the accumulators and broadcast variables
-  private void setup() throws Exception
+  private void setup() throws IOException, PIRException
   {
     // Load the schemas
     DataSchemaLoader.initialize(true, fs);
@@ -219,7 +225,7 @@ public class ComputeResponse
   /**
    * Method to read in data from an allowed input source/format and perform the query
    */
-  public void performQuery() throws Exception
+  public void performQuery() throws IOException, PIRException
   {
     logger.info("Performing query: ");
 
@@ -243,7 +249,7 @@ public class ComputeResponse
    * Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements
    */
   @SuppressWarnings("unchecked")
-  public JavaRDD<MapWritable> readData() throws Exception
+  public JavaRDD<MapWritable> readData() throws IOException, PIRException
   {
     logger.info("Reading data ");
 
@@ -268,10 +274,13 @@ public class ComputeResponse
 
     // Set the inputFormatClass based upon the baseInputFormat property
     String classString = SystemConfiguration.getProperty("pir.baseInputFormat");
-    Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString);
-    if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass))
+    Class<? extends BaseInputFormat<Text,MapWritable>> inputClass;
+    try
+    {
+      inputClass = (Class<? extends BaseInputFormat<Text,MapWritable>>) Class.forName(classString);
+    } catch (ClassNotFoundException | ClassCastException e)
     {
-      throw new Exception("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
+      throw new PIRException(classString + " cannot be instantiated or does not extend BaseInputFormat", e);
     }
     job.setInputFormatClass(inputClass);
 
@@ -296,7 +305,7 @@ public class ComputeResponse
    * Method to read in the data from elasticsearch, filter, and return a RDD of MapWritable data elements
    */
   @SuppressWarnings("unchecked")
-  public JavaRDD<MapWritable> readDataES() throws Exception
+  public JavaRDD<MapWritable> readDataES() throws IOException, PIRException
   {
     logger.info("Reading data ");
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
index bd05236..fce905d 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +40,7 @@ public class SparkResponder implements ResponderPlugin
   }
 
   @Override
-  public void run() throws Exception
+  public void run() throws PIRException
   {
     logger.info("Launching Spark ComputeResponse:");
     try

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
index b91cc68..c291df0 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
@@ -43,10 +43,12 @@ import org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData;
 import org.apache.pirk.schema.data.DataSchema;
 import org.apache.pirk.schema.data.DataSchemaLoader;
 import org.apache.pirk.schema.data.DataSchemaRegistry;
+import org.apache.pirk.schema.data.partitioner.DataPartitioner;
 import org.apache.pirk.schema.query.QuerySchema;
 import org.apache.pirk.schema.query.QuerySchemaLoader;
 import org.apache.pirk.schema.query.QuerySchemaRegistry;
 import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -106,7 +108,7 @@ public class ComputeStreamingResponse
 
   private boolean colMultReduceByKey = false;
 
-  public ComputeStreamingResponse(FileSystem fileSys) throws Exception
+  public ComputeStreamingResponse(FileSystem fileSys) throws PIRException
   {
     fs = fileSys;
     storage = new HadoopFileSystemStore(fs);
@@ -171,12 +173,18 @@ public class ComputeStreamingResponse
 
     // Setup, run query, teardown
     logger.info("Setting up for query run");
-    setup();
+    try
+    {
+      setup();
+    } catch (IOException e)
+    {
+      throw new PIRException("An error occurred setting up the streaming responder.", e);
+    }
     logger.info("Setup complete");
   }
 
   // Setup for the accumulators and broadcast variables
-  private void setup() throws Exception
+  private void setup() throws IOException, PIRException
   {
     // Load the schemas
     DataSchemaLoader.initialize(true, fs);
@@ -191,11 +199,6 @@ public class ComputeStreamingResponse
     QueryInfo queryInfo = query.getQueryInfo();
     bVars.setQuery(query);
     bVars.setQueryInfo(queryInfo);
-    
-    if(query == null)
-    {
-    	logger.info("query is null for queryInput = " + queryInput);
-    }
 
     if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false))
     {
@@ -235,15 +238,20 @@ public class ComputeStreamingResponse
 
   /**
    * Method to start the computation
-   * 
-   * @throws InterruptedException
    */
-  public void start() throws InterruptedException
+  public void start()
   {
     logger.info("Starting computation...");
 
     jssc.start();
-    jssc.awaitTermination();
+    try
+    {
+      jssc.awaitTermination();
+    } catch (InterruptedException e)
+    {
+      // Interrupted while waiting for termination
+      Thread.interrupted();
+    }
   }
 
   /**
@@ -259,7 +267,7 @@ public class ComputeStreamingResponse
   /**
    * Method to read in data from an allowed input source/format and perform the query
    */
-  public void performQuery() throws IOException, ClassNotFoundException, InterruptedException
+  public void performQuery() throws IOException, PIRException
   {
     logger.info("Performing query: ");
 
@@ -272,6 +280,9 @@ public class ComputeStreamingResponse
     {
       inputRDD = readDataES();
     }
+    else {
+      throw new PIRException("Unknown data input format " + dataInputFormat);
+    }
 
     performQuery(inputRDD);
   }
@@ -280,7 +291,7 @@ public class ComputeStreamingResponse
    * Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements
    */
   @SuppressWarnings("unchecked")
-  public JavaDStream<MapWritable> readData() throws ClassNotFoundException, IOException
+  public JavaDStream<MapWritable> readData() throws IOException, PIRException
   {
     logger.info("Reading data ");
 
@@ -296,10 +307,13 @@ public class ComputeStreamingResponse
 
     // Set the inputFormatClass based upon the baseInputFormat property
     String classString = SystemConfiguration.getProperty("pir.baseInputFormat");
-    Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString);
-    if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass))
+    Class<? extends BaseInputFormat<Text,MapWritable>> inputClass;
+    try
+    {
+      inputClass = (Class<? extends BaseInputFormat<Text,MapWritable>>) Class.forName(classString);
+    } catch (ClassNotFoundException | ClassCastException e)
     {
-      throw new ClassCastException("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
+      throw new PIRException(classString + " cannot be instantiated or does not extend BaseInputFormat", e);
     }
     job.setInputFormatClass(inputClass);
 
@@ -397,10 +411,8 @@ public class ComputeStreamingResponse
   /**
    * Method to perform the query given an input JavaDStream of JSON
    * 
-   * @throws InterruptedException
-   * 
    */
-  public void performQuery(JavaDStream<MapWritable> input) throws InterruptedException
+  public void performQuery(JavaDStream<MapWritable> input)
   {
     logger.info("Performing query: ");
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
index 295a3cf..4ce0571 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java
@@ -18,15 +18,16 @@
  */
 package org.apache.pirk.responder.wideskies.spark.streaming;
 
+import java.io.IOException;
+import java.security.Permission;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.pirk.responder.wideskies.ResponderDriver;
 import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.security.Permission;
-
 /**
  * Class to launch stand alone responder
  */
@@ -40,15 +41,25 @@ public class SparkStreamingResponder implements ResponderPlugin
   }
 
   @Override
-  public void run() throws Exception
+  public void run() throws PIRException
   {
     // For handling System.exit calls from Spark Streaming
     System.setSecurityManager(new SystemExitManager());
+    
+    FileSystem fileSys;
+    try
+    {
+      fileSys = FileSystem.get(new Configuration());
+    } catch (IOException e)
+    {
+      throw new PIRException(e);
+    }
+    
     logger.info("Launching Spark ComputeStreamingResponse:");
     ComputeStreamingResponse computeSR = null;
     try
     {
-      computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration()));
+      computeSR = new ComputeStreamingResponse(fileSys);
       computeSR.performQuery();
     }
     catch (SystemExitException e)
@@ -56,6 +67,9 @@ public class SparkStreamingResponder implements ResponderPlugin
       // If System.exit(0) is not caught from Spark Streaming,
       // the application will complete with a 'failed' status
       logger.info("Exited with System.exit(0) from Spark Streaming");
+    } catch (IOException e)
+    {
+      throw new PIRException(e);
     }
     finally
     {
@@ -67,7 +81,9 @@ public class SparkStreamingResponder implements ResponderPlugin
 
   // Exception and Security Manager classes used to catch System.exit from Spark Streaming
   private static class SystemExitException extends SecurityException
-  {}
+  {
+    private static final long serialVersionUID = 1L;
+  }
 
   private static class SystemExitManager extends SecurityManager
   {

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
index 3dade0d..912850a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java
@@ -19,6 +19,8 @@
 
 package org.apache.pirk.responder.wideskies.spi;
 
+import org.apache.pirk.utils.PIRException;
+
 /**
  * Interface which launches a responder
  * <p>
@@ -36,5 +38,5 @@ public interface ResponderPlugin
   /**
    * This method launches your framework responder.
    */
-  public void run() throws Exception;
+  public void run() throws PIRException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
index e0f83d3..fd23c7e 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
@@ -19,11 +19,18 @@
 package org.apache.pirk.responder.wideskies.storm;
 
 import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.utils.PIRException;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.*;
+import org.apache.storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.KafkaSpout;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.ZkHosts;
 import org.apache.storm.spout.SchemeAsMultiScheme;
 import org.apache.storm.topology.BoltDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
@@ -61,7 +68,7 @@ public class PirkTopology
   private static final String queryFile = SystemConfiguration.getProperty("pir.queryInput");
   private static final String outputPath = SystemConfiguration.getProperty("pir.outputFile");
 
-  public static void runPirkTopology() throws Exception
+  public static void runPirkTopology() throws PIRException
   {
     // Set up Kafka parameters
     logger.info("Configuring Kafka.");
@@ -85,7 +92,13 @@ public class PirkTopology
 
     // Run topology
     logger.info("Submitting Pirk topology to Storm...");
-    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+    try
+    {
+      StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+    } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e)
+    {
+      throw new PIRException(e);
+    }
 
   } // main
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
index 08400ac..b988ccc 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java
@@ -20,6 +20,7 @@
 package org.apache.pirk.responder.wideskies.storm;
 
 import org.apache.pirk.responder.wideskies.spi.ResponderPlugin;
+import org.apache.pirk.utils.PIRException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +38,7 @@ public class StormResponder implements ResponderPlugin
   }
 
   @Override
-  public void run() throws Exception
+  public void run() throws PIRException
   {
     logger.info("Launching Storm PirkTopology:");
     PirkTopology.runPirkTopology();

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index 4bac8b7..d1a4797 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -19,7 +19,6 @@
 package org.apache.pirk.schema.data;
 
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
@@ -79,14 +78,12 @@ public class DataSchemaLoader
   static
   {
     logger.info("Loading pre-configured data schemas: ");
-
     try
     {
       initialize();
-    } catch (Exception e)
+    } catch (PIRException e)
     {
-      logger.error("Caught exception: ");
-      e.printStackTrace();
+      logger.error(e.getLocalizedMessage());
     }
   }
 
@@ -94,10 +91,10 @@ public class DataSchemaLoader
   /**
    * Initializes the static {@link DataSchemaRegistry} with a list of available data schema names.
    * 
-   * @throws Exception
+   * @throws PIRException
    *           - failed to initialize
    */
-  public static void initialize() throws Exception
+  public static void initialize() throws PIRException
   {
     initialize(false, null);
   }
@@ -110,10 +107,10 @@ public class DataSchemaLoader
    *          If true, specifies that the data schema is an hdfs file; if false, that it is a regular file.
    * @param fs
    *          Used only when {@code hdfs} is true; the {@link FileSystem} handle for the hdfs in which the data schema exists
-   * @throws Exception
-   *           - failed to initialize
+   * @throws PIRException
+   *           - failed to initialize the data schemas because they could not be read or are invalid.
    */
-  public static void initialize(boolean hdfs, FileSystem fs) throws Exception
+  public static void initialize(boolean hdfs, FileSystem fs) throws PIRException
   {
     String dataSchemas = SystemConfiguration.getProperty("data.schemas", "none");
     if (dataSchemas.equals("none"))
@@ -122,41 +119,43 @@ public class DataSchemaLoader
     }
 
     String[] dataSchemaFiles = dataSchemas.split(",");
-    for (String schemaFile : dataSchemaFiles)
+    try
     {
-      logger.info("Loading schemaFile = " + schemaFile + " hdfs = " + hdfs);
-
-      // Parse and load the schema file into a DataSchema object; place in the schemaMap
-      DataSchemaLoader loader = new DataSchemaLoader();
-      InputStream is = null;
-      if (hdfs)
+      for (String schemaFile : dataSchemaFiles)
       {
-        logger.info("hdfs: filePath = " + schemaFile);
-        is = fs.open(fs.makeQualified(new Path(schemaFile)));
-      }
-      else
-      {
-        try
-        {
-          is = new FileInputStream(schemaFile);
-          logger.info("localFS: inputFile = " + schemaFile);
-        } catch (FileNotFoundException e)
-        {
-          logger.info("localFS: inputFile = " + schemaFile + " not found");
-        }
+        DataSchema dataSchema = readSchemaFile(schemaFile, fs, hdfs);
+        DataSchemaRegistry.put(dataSchema);
       }
+    } catch (IOException e)
+    {
+      throw new PIRException("Error reading data schema", e);
+    }
+  }
 
-      if (is != null)
-      {
-        try
-        {
-          DataSchema dataSchema = loader.loadSchema(is);
-          DataSchemaRegistry.put(dataSchema);
-        } finally
-        {
-          is.close();
-        }
-      }
+  private static DataSchema readSchemaFile(String schemaFile, FileSystem fs, boolean hdfs) throws IOException, PIRException
+  {
+    logger.info("Loading data schemaFile = " + schemaFile + " hdfs = " + hdfs);
+
+    // Parse and load the schema file into a DataSchema object; place in the schemaMap
+    DataSchemaLoader loader = new DataSchemaLoader();
+    InputStream is;
+    if (hdfs)
+    {
+      logger.info("hdfs: filePath = " + schemaFile);
+      is = fs.open(fs.makeQualified(new Path(schemaFile)));
+    }
+    else
+    {
+      logger.info("localFS: inputFile = " + schemaFile);
+      is = new FileInputStream(schemaFile);
+    }
+
+    try
+    {
+      return loader.loadSchema(is);
+    } finally
+    {
+      is.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
index 433a303..c1b4139 100644
--- a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
@@ -19,7 +19,6 @@
 package org.apache.pirk.schema.query;
 
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashSet;
@@ -85,10 +84,9 @@ public class QuerySchemaLoader
     try
     {
       initialize();
-    } catch (Exception e)
+    } catch (PIRException e)
     {
-      logger.error("Caught exception: ");
-      e.printStackTrace();
+      logger.error(e.getLocalizedMessage());
     }
   }
 
@@ -96,10 +94,10 @@ public class QuerySchemaLoader
   /**
    * Initializes the static {@link QuerySchemaRegistry} with a list of query schema names.
    * 
-   * @throws Exception
+   * @throws PIRException
    *           - failed to initialize
    */
-  public static void initialize() throws Exception
+  public static void initialize() throws PIRException
   {
     initialize(false, null);
   }
@@ -112,52 +110,55 @@ public class QuerySchemaLoader
    *          If true, specifies that the query schema is an hdfs file; if false, that it is a regular file.
    * @param fs
    *          Used only when {@code hdfs} is true; the {@link FileSystem} handle for the hdfs in which the query schema exists
-   * @throws Exception
-   *           - failed to initialize
+   * @throws PIRException
+   *           - failed to initialize the query schemas because they could not be read or are invalid.
    */
-  public static void initialize(boolean hdfs, FileSystem fs) throws Exception
+  public static void initialize(boolean hdfs, FileSystem fs) throws PIRException
   {
     String querySchemas = SystemConfiguration.getProperty("query.schemas", "none");
     if (querySchemas.equals("none"))
     {
       return;
     }
+
     String[] querySchemaFiles = querySchemas.split(",");
-    for (String schemaFile : querySchemaFiles)
+    try
     {
-      logger.info("Loading schemaFile = " + schemaFile);
-
-      // Parse and load the schema file into a QuerySchema object; place in the schemaMap
-      QuerySchemaLoader loader = new QuerySchemaLoader();
-      InputStream is = null;
-      if (hdfs)
+      for (String schemaFile : querySchemaFiles)
       {
-        is = fs.open(new Path(schemaFile));
-        logger.info("hdfs: filePath = " + schemaFile);
-      }
-      else
-      {
-        try
-        {
-          is = new FileInputStream(schemaFile);
-          logger.info("localFS: inputFile = " + schemaFile);
-        } catch (FileNotFoundException e)
-        {
-          logger.info("localFS: inputFile = " + schemaFile + " not found");
-        }
+        QuerySchema querySchema = readSchemaFile(schemaFile, fs, hdfs);
+        QuerySchemaRegistry.put(querySchema);
       }
+    } catch (IOException e)
+    {
+      throw new PIRException("Error reading query schema", e);
+    }
+  }
 
-      if (is != null)
-      {
-        try
-        {
-          QuerySchema querySchema = loader.loadSchema(is);
-          QuerySchemaRegistry.put(querySchema);
-        } finally
-        {
-          is.close();
-        }
-      }
+  private static QuerySchema readSchemaFile(String schemaFile, FileSystem fs, boolean hdfs) throws IOException, PIRException
+  {
+    logger.info("Loading query schemaFile = " + schemaFile);
+
+    // Parse and load the schema file into a QuerySchema object.
+    QuerySchemaLoader loader = new QuerySchemaLoader();
+    InputStream is;
+    if (hdfs)
+    {
+      logger.info("hdfs: filePath = " + schemaFile);
+      is = fs.open(new Path(schemaFile));
+    }
+    else
+    {
+      logger.info("localFS: inputFile = " + schemaFile);
+      is = new FileInputStream(schemaFile);
+    }
+
+    try
+    {
+      return loader.loadSchema(is);
+    } finally
+    {
+      is.close();
     }
   }