You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pirk.apache.org by ea...@apache.org on 2016/07/23 23:35:03 UTC
incubator-pirk git commit: PIRK-25 Serialization and storage changes
to Querier, Query, and Response - closes apache/incubator-pirk#18
Repository: incubator-pirk
Updated Branches:
refs/heads/master ef8d1c1a5 -> 442b21790
PIRK-25 Serialization and storage changes to Querier, Query, and Response - closes apache/incubator-pirk#18
Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/442b2179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/442b2179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/442b2179
Branch: refs/heads/master
Commit: 442b21790d16bf185b6bd8b56d649ae79b35ade0
Parents: ef8d1c1
Author: tellison <te...@apache.org>
Authored: Sat Jul 23 19:34:45 2016 -0400
Committer: eawilliams <ea...@apache.org>
Committed: Sat Jul 23 19:34:45 2016 -0400
----------------------------------------------------------------------
.../apache/pirk/querier/wideskies/Querier.java | 91 +-----------
.../pirk/querier/wideskies/QuerierDriver.java | 9 +-
.../querier/wideskies/encrypt/EncryptQuery.java | 34 -----
.../org/apache/pirk/query/wideskies/Query.java | 148 +------------------
.../responder/wideskies/ResponderDriver.java | 3 +-
.../wideskies/mapreduce/ColumnMultReducer.java | 3 +-
.../mapreduce/ComputeResponseTool.java | 5 +-
.../wideskies/mapreduce/ExpTableMapper.java | 5 +-
.../mapreduce/FinalResponseReducer.java | 9 +-
.../HashSelectorsAndPartitionDataMapper.java | 4 +-
.../wideskies/mapreduce/RowCalcReducer.java | 3 +-
.../wideskies/spark/ComputeExpLookupTable.java | 10 +-
.../wideskies/spark/ComputeResponse.java | 16 +-
.../wideskies/standalone/Responder.java | 3 +-
.../pirk/response/wideskies/Response.java | 134 +----------------
.../serialization/HadoopFileSystemStore.java | 94 ++++++++++++
.../pirk/serialization/JavaSerializer.java | 49 ++++++
.../pirk/serialization/JsonSerializer.java | 41 +++++
.../serialization/LocalFileSystemStore.java | 82 ++++++++++
.../serialization/SerializationService.java | 49 ++++++
.../org/apache/pirk/serialization/Storable.java | 25 ++++
.../pirk/serialization/StorageService.java | 39 +++++
.../distributed/testsuite/DistTestSuite.java | 5 +-
.../apache/pirk/test/utils/StandaloneQuery.java | 13 +-
24 files changed, 448 insertions(+), 426 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/querier/wideskies/Querier.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/Querier.java b/src/main/java/org/apache/pirk/querier/wideskies/Querier.java
index 7ffc7a0..4d6523d 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/Querier.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/Querier.java
@@ -1,4 +1,4 @@
-/*
+/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,15 +15,9 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- */
+ *******************************************************************************/
package org.apache.pirk.querier.wideskies;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -31,6 +25,7 @@ import java.util.HashMap;
import org.apache.pirk.encryption.Paillier;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.serialization.Storable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +33,7 @@ import org.slf4j.LoggerFactory;
* Class to hold the information necessary for the PIR querier to perform decryption
*
*/
-public class Querier implements Serializable
+public class Querier implements Serializable, Storable
{
private static final long serialVersionUID = 1L;
@@ -95,82 +90,4 @@ public class Querier implements Serializable
{
return embedSelectorMap;
}
-
- /**
- * Method to serialize the Querier object to a file
- */
- public void writeToFile(String filename) throws IOException
- {
- writeToFile(new File(filename));
- }
-
- /**
- * Method to serialize the Querier object to a file
- */
- public void writeToFile(File file) throws IOException
- {
- ObjectOutputStream oos = null;
- FileOutputStream fout = null;
- try
- {
- fout = new FileOutputStream(file, true);
- oos = new ObjectOutputStream(fout);
- oos.writeObject(this);
- } catch (Exception ex)
- {
- ex.printStackTrace();
- } finally
- {
- if (oos != null)
- {
- oos.close();
- }
- if (fout != null)
- {
- fout.close();
- }
- }
- }
-
- /**
- * Reconstruct the Querier object from its file serialization
- */
- public static Querier readFromFile(String filename) throws IOException
- {
-
- return readFromFile(new File(filename));
- }
-
- /**
- * Reconstruct the Querier object from its file serialization
- */
- public static Querier readFromFile(File file) throws IOException
- {
- Querier querier = null;
-
- FileInputStream fIn = null;
- ObjectInputStream oIn;
- try
- {
- fIn = new FileInputStream(file);
- oIn = new ObjectInputStream(fIn);
- querier = (Querier) oIn.readObject();
- } catch (IOException | ClassNotFoundException e)
- {
- e.printStackTrace();
- } finally
- {
- if (fIn != null)
- {
- try
- {
- fIn.close();
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
- return querier;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
index 01a6c86..8f287fd 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
@@ -29,6 +29,7 @@ import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.response.wideskies.Response;
import org.apache.pirk.schema.query.LoadQuerySchemas;
+import org.apache.pirk.serialization.LocalFileSystemStore;
import org.apache.pirk.utils.FileIOUtils;
import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
@@ -82,6 +83,7 @@ public class QuerierDriver implements Serializable
String outputFile;
String queryType = null;
int numThreads;
+ LocalFileSystemStore storage = new LocalFileSystemStore();
// Encryption variables
int hashBitSize = 0;
@@ -182,14 +184,15 @@ public class QuerierDriver implements Serializable
// Write necessary output files - two files written -
// (1) Querier object to <outputFile>-QuerierConst.QUERIER_FILETAG
// (2) Query object to <outputFile>-QuerierConst.QUERY_FILETAG
- encryptQuery.writeOutputFiles(outputFile);
+ storage.store(outputFile + "-" + QuerierConst.QUERIER_FILETAG, encryptQuery.getQuerier());
+ storage.store(outputFile + "-" + QuerierConst.QUERY_FILETAG, encryptQuery.getQuery());
}
else
// Decryption
{
// Reconstruct the necessary objects from the files
- Response response = Response.readFromFile(inputFile);
- Querier querier = Querier.readFromFile(querierFile);
+ Response response = storage.recall(inputFile, Response.class);
+ Querier querier = storage.recall(querierFile, Querier.class);
// Perform decryption and output the result file
DecryptResponse decryptResponse = new DecryptResponse(response, querier);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
index 87ee9d9..a277c46 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
@@ -258,38 +258,4 @@ public class EncryptQuery
}
logger.info("Completed creation of encrypted query vectors");
}
-
- /**
- * Creates two output files - two files written:
- * <p>
- * (1) Querier object to <filePrefix>-QuerierConst.QUERIER_FILETAG
- * <p>
- * (2) Query object to <filePrefix>-QuerierConst.QUERY_FILETAG
- */
- public void writeOutputFiles(String filePrefix) throws IOException
- {
- // Write the Querier object
- querier.writeToFile(filePrefix + "-" + QuerierConst.QUERIER_FILETAG);
-
- // Write the Query object
- query.writeToFile(filePrefix + "-" + QuerierConst.QUERY_FILETAG);
- }
-
- /**
- * Creates two output files - two files written:
- * <p>
- * (1) Querier object to <filePrefix>-QuerierConst.QUERIER_FILETAG
- * <p>
- * (2) Query object to <filePrefix>-QuerierConst.QUERY_FILETAG
- * <p>
- * This method is used for functional testing
- */
- public void writeOutputFiles(File fileQuerier, File fileQuery) throws IOException
- {
- // Write the Querier object
- querier.writeToFile(fileQuerier);
-
- // Write the Query object
- query.writeToFile(fileQuery);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/query/wideskies/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/query/wideskies/Query.java b/src/main/java/org/apache/pirk/query/wideskies/Query.java
index ebaafbb..2035d4b 100644
--- a/src/main/java/org/apache/pirk/query/wideskies/Query.java
+++ b/src/main/java/org/apache/pirk/query/wideskies/Query.java
@@ -18,12 +18,6 @@
*/
package org.apache.pirk.query.wideskies;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
@@ -33,10 +27,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.pirk.encryption.ModPowAbstraction;
import org.apache.pirk.querier.wideskies.encrypt.ExpTableRunnable;
+import org.apache.pirk.serialization.Storable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +37,7 @@ import org.slf4j.LoggerFactory;
* Class to hold the PIR query vectors
*
*/
-public class Query implements Serializable
+public class Query implements Serializable, Storable
{
private static final long serialVersionUID = 1L;
@@ -225,141 +218,4 @@ public class Query implements Serializable
{
return expTable.get(value).get(power);
}
-
- public void writeToFile(String filename) throws IOException
- {
- writeToFile(new File(filename));
- }
-
- public void writeToFile(File file) throws IOException
- {
- ObjectOutputStream oos = null;
- FileOutputStream fout = null;
- try
- {
- fout = new FileOutputStream(file, true);
- oos = new ObjectOutputStream(fout);
- oos.writeObject(this);
- } catch (Exception ex)
- {
- ex.printStackTrace();
- } finally
- {
- if (oos != null)
- {
- oos.close();
- }
- if (fout != null)
- {
- fout.close();
- }
- }
- }
-
- /**
- * Reconstruct the Query object from its file serialization
- */
- public static Query readFromFile(String filename) throws IOException
- {
-
- return readFromFile(new File(filename));
- }
-
- /**
- * Reconstruct the Query object from its file serialization
- */
- public static Query readFromFile(File file) throws IOException
- {
- Query query = null;
-
- FileInputStream fIn = null;
- ObjectInputStream oIn;
- try
- {
- fIn = new FileInputStream(file);
- oIn = new ObjectInputStream(fIn);
- query = (Query) oIn.readObject();
- } catch (IOException | ClassNotFoundException e)
- {
- e.printStackTrace();
- } finally
- {
- if (fIn != null)
- {
- try
- {
- fIn.close();
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
- return query;
- }
-
- /**
- * Method to write the Query object to a file in HDFS
- *
- */
- public void writeToHDFSFile(Path fileName, FileSystem fs)
- {
-
- ObjectOutputStream oos = null;
- try
- {
- oos = new ObjectOutputStream(fs.create(fileName));
- oos.writeObject(this);
- oos.close();
- } catch (IOException e)
- {
- e.printStackTrace();
- } finally
- {
- if (oos != null)
- {
- try
- {
- oos.close();
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
- }
-
- /**
- * Method to reconstruct the Query object from its file serialization in HDFS
- */
- public static Query readFromHDFSFile(Path filename, FileSystem fs)
- {
- Query pirWLQuery = null;
-
- ObjectInputStream ois = null;
- try
- {
- ois = new ObjectInputStream(fs.open(filename));
- pirWLQuery = (Query) ois.readObject();
- ois.close();
-
- } catch (IOException | ClassNotFoundException e1)
- {
- e1.printStackTrace();
- } finally
- {
- if (ois != null)
- {
- try
- {
- ois.close();
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
-
- return pirWLQuery;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index 4cd6b5f..61dbb23 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -25,6 +25,7 @@ import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool;
import org.apache.pirk.responder.wideskies.spark.ComputeResponse;
import org.apache.pirk.responder.wideskies.standalone.Responder;
+import org.apache.pirk.serialization.LocalFileSystemStore;
import org.apache.pirk.utils.SystemConfiguration;
/**
@@ -65,7 +66,7 @@ public class ResponderDriver
System.out.println("Launching Standalone Responder:");
String queryInput = SystemConfiguration.getProperty("pir.queryInput");
- Query query = Query.readFromFile(queryInput);
+ Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
Responder pirResponder = new Responder(query);
pirResponder.computeStandaloneResponse();
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
index abffadf..df3b7d0 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.utils.FileConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ public class ColumnMultReducer extends Reducer<LongWritable,Text,LongWritable,Te
FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
- query = Query.readFromHDFSFile(new Path(queryDir), fs);
+ query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
index fb3027b..6eab9fe 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java
@@ -49,6 +49,7 @@ import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.utils.FileConst;
import org.apache.pirk.utils.HDFS;
import org.apache.pirk.utils.SystemConfiguration;
@@ -127,7 +128,7 @@ public class ComputeResponseTool extends Configured implements Tool
LoadDataSchemas.initialize(true, fs);
LoadQuerySchemas.initialize(true, fs);
- query = Query.readFromHDFSFile(new Path(queryInputDir), fs);
+ query = new HadoopFileSystemStore(fs).recall(queryInputDir, Query.class);
queryInfo = query.getQueryInfo();
qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
@@ -331,7 +332,7 @@ public class ComputeResponseTool extends Configured implements Tool
// Place exp table in query object
query.setExpFileBasedLookup(expFileTable);
- query.writeToHDFSFile(new Path(queryInputDir), fs);
+ new HadoopFileSystemStore(fs).store(queryInputDir, query);
logger.info("Completed creation of expTable");
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
index 28d49a3..c53acdc 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java
@@ -22,12 +22,12 @@ import java.io.IOException;
import java.math.BigInteger;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.pirk.encryption.ModPowAbstraction;
import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,9 +52,8 @@ public class ExpTableMapper extends Mapper<LongWritable,Text,Text,Text>
valueOut = new Text();
- FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
- query = Query.readFromHDFSFile(new Path(queryDir), fs);
+ query = new HadoopFileSystemStore(FileSystem.newInstance(ctx.getConfiguration())).recall(queryDir, Query.class);
int dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize();
maxValue = (int) Math.pow(2, dataPartitionBitSize) - 1;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
index 1df7b0e..8f7cbe8 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.math.BigInteger;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
@@ -30,6 +29,7 @@ import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +46,8 @@ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable
private Response response = null;
private String outputFile = null;
private FileSystem fs = null;
+ private HadoopFileSystemStore storage = null;
+ private QueryInfo queryInfo = null;
@Override
public void setup(Context ctx) throws IOException, InterruptedException
@@ -56,8 +58,9 @@ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable
mos = new MultipleOutputs<>(ctx);
fs = FileSystem.newInstance(ctx.getConfiguration());
+ storage = new HadoopFileSystemStore(fs);
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
- Query query = Query.readFromHDFSFile(new Path(queryDir), fs);
+ Query query = storage.recall(queryDir, Query.class);
QueryInfo queryInfo = query.getQueryInfo();
outputFile = ctx.getConfiguration().get("pirMR.outputFile");
@@ -83,7 +86,7 @@ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable
@Override
public void cleanup(Context ctx) throws IOException, InterruptedException
{
- response.writeToHDFSFile(new Path(outputFile), fs);
+ storage.store(outputFile, response);
mos.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
index 95396a9..b04babd 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
@@ -36,6 +36,7 @@ import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
import org.apache.pirk.schema.query.filter.DataFilter;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.utils.StringUtils;
import org.apache.pirk.utils.SystemConfiguration;
import org.slf4j.Logger;
@@ -57,6 +58,7 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable
HashSet<String> stopList = null;
+ private Query query = null;
private QueryInfo queryInfo = null;
private QuerySchema qSchema = null;
private DataSchema dSchema = null;
@@ -75,7 +77,7 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable
// Can make this so that it reads multiple queries at one time...
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
- Query query = Query.readFromHDFSFile(new Path(queryDir), fs);
+ query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
queryInfo = query.getQueryInfo();
try
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
index e35ee84..ea57d2d 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java
@@ -37,6 +37,7 @@ import org.apache.pirk.schema.data.DataSchema;
import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.utils.FileConst;
import org.apache.pirk.utils.SystemConfiguration;
import org.slf4j.Logger;
@@ -79,7 +80,7 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW
fs = FileSystem.newInstance(ctx.getConfiguration());
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
- query = Query.readFromHDFSFile(new Path(queryDir), fs);
+ query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
queryInfo = query.getQueryInfo();
try
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
index 938c32e..2feeca8 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java
@@ -28,12 +28,14 @@ import java.util.TreeMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.utils.SystemConfiguration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.Tuple2;
/**
@@ -100,7 +102,13 @@ public class ComputeExpLookupTable
// Place exp table in query object and in the BroadcastVars
Map<Integer,String> queryHashFileNameMap = hashToPartition.collectAsMap();
query.setExpFileBasedLookup(new HashMap<>(queryHashFileNameMap));
- query.writeToHDFSFile(new Path(queryInputFile), fs);
+ try
+ {
+ new HadoopFileSystemStore(fs).store(queryInputFile, query);
+ } catch (IOException e)
+ {
+ e.printStackTrace();
+ }
bVars.setQuery(query);
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index ba7fd12..c6b0d28 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -18,12 +18,12 @@
*/
package org.apache.pirk.responder.wideskies.spark;
+import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -36,6 +36,7 @@ import org.apache.pirk.response.wideskies.Response;
import org.apache.pirk.schema.data.LoadDataSchemas;
import org.apache.pirk.schema.query.LoadQuerySchemas;
import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
import org.apache.spark.SparkConf;
@@ -45,6 +46,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.Tuple2;
/**
@@ -77,6 +79,7 @@ public class ComputeResponse
private boolean useModExpJoin = false;
private FileSystem fs = null;
+ private HadoopFileSystemStore storage = null;
private JavaSparkContext sc = null;
private Accumulators accum = null;
@@ -93,6 +96,7 @@ public class ComputeResponse
public ComputeResponse(FileSystem fileSys) throws Exception
{
fs = fileSys;
+ storage = new HadoopFileSystemStore(fs);
dataInputFormat = SystemConfiguration.getProperty("pir.dataInputFormat");
if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat))
@@ -162,7 +166,7 @@ public class ComputeResponse
bVars = new BroadcastVars(sc);
// Set the Query and QueryInfo broadcast variables
- query = Query.readFromHDFSFile(new Path(queryInput), fs);
+ query = storage.recall(queryInput, Query.class);
queryInfo = query.getQueryInfo();
bVars.setQuery(query);
bVars.setQueryInfo(queryInfo);
@@ -366,7 +370,13 @@ public class ComputeResponse
logger.debug("colNum = " + colVal + " column = " + encColResults.get(colVal).toString());
}
- response.writeToHDFSFile(new Path(outputFile), fs);
+ try
+ {
+ storage.store(outputFile, response);
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
accum.printAll();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
index 879b618..4ac3923 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java
@@ -30,6 +30,7 @@ import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.query.wideskies.QueryUtils;
import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.LocalFileSystemStore;
import org.apache.pirk.utils.KeyedHash;
import org.apache.pirk.utils.SystemConfiguration;
import org.json.simple.JSONObject;
@@ -126,7 +127,7 @@ public class Responder
// Set the response object, extract, write to file
String outputFile = SystemConfiguration.getProperty("pir.outputFile");
setResponseElements();
- response.writeToFile(outputFile);
+ new LocalFileSystemStore().store(outputFile, response);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/response/wideskies/Response.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/response/wideskies/Response.java b/src/main/java/org/apache/pirk/response/wideskies/Response.java
index 3d2a3c0..667b8a3 100644
--- a/src/main/java/org/apache/pirk/response/wideskies/Response.java
+++ b/src/main/java/org/apache/pirk/response/wideskies/Response.java
@@ -18,19 +18,12 @@
*/
package org.apache.pirk.response.wideskies;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.TreeMap;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.serialization.Storable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +33,7 @@ import org.slf4j.LoggerFactory;
* Serialized and returned to the querier for decryption
*
*/
-public class Response implements Serializable
+public class Response implements Serializable, Storable
{
private static final long serialVersionUID = 1L;
@@ -76,127 +69,4 @@ public class Response implements Serializable
{
responseElements.put(position, element);
}
-
- public void writeToFile(String filename) throws IOException
- {
- writeToFile(new File(filename));
- }
-
- public void writeToFile(File file) throws IOException
- {
- ObjectOutputStream oos = null;
- FileOutputStream fout = null;
- try
- {
- fout = new FileOutputStream(file, true);
- oos = new ObjectOutputStream(fout);
- oos.writeObject(this);
- } catch (Exception ex)
- {
- ex.printStackTrace();
- } finally
- {
- if (oos != null)
- {
- oos.close();
- }
- if (fout != null)
- {
- fout.close();
- }
- }
- }
-
- public void writeToHDFSFile(Path fileName, FileSystem fs)
- {
-
- ObjectOutputStream oos = null;
- try
- {
- oos = new ObjectOutputStream(fs.create(fileName));
- oos.writeObject(this);
- oos.close();
- } catch (IOException e)
- {
- e.printStackTrace();
- } finally
- {
- if (oos != null)
- {
- try
- {
- oos.close();
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
- }
-
- public static Response readFromFile(String filename) throws IOException
- {
- return readFromFile(new File(filename));
- }
-
- public static Response readFromFile(File file) throws IOException
- {
- Response response = null;
-
- ObjectInputStream objectinputstream = null;
- FileInputStream streamIn = null;
- try
- {
- streamIn = new FileInputStream(file);
- objectinputstream = new ObjectInputStream(streamIn);
- response = (Response) objectinputstream.readObject();
-
- } catch (Exception e)
- {
- e.printStackTrace();
- } finally
- {
- if (objectinputstream != null)
- {
- objectinputstream.close();
- }
- if (streamIn != null)
- {
- streamIn.close();
- }
- }
-
- return response;
- }
-
- // Used for testing
- public static Response readFromHDFSFile(Path file, FileSystem fs) throws IOException
- {
- Response response = null;
-
- ObjectInputStream ois = null;
- try
- {
- ois = new ObjectInputStream(fs.open(file));
- response = (Response) ois.readObject();
- ois.close();
-
- } catch (IOException | ClassNotFoundException e1)
- {
- e1.printStackTrace();
- } finally
- {
- if (ois != null)
- {
- try
- {
- ois.close();
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
- return response;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java b/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java
new file mode 100644
index 0000000..7e1e475
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java
@@ -0,0 +1,94 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HadoopFileSystemStore extends StorageService
+{
+
+ private FileSystem hadoopFileSystem;
+
+ // Prevents others from using default constructor.
+ HadoopFileSystemStore()
+ {
+ super();
+ }
+
+ /**
+ * Creates a new storage service on the given HDFS file system using default Java serialization.
+ */
+ public HadoopFileSystemStore(FileSystem fs)
+ {
+ super();
+ hadoopFileSystem = fs;
+ }
+
+ public HadoopFileSystemStore(FileSystem fs, SerializationService serial)
+ {
+ super(serial);
+ hadoopFileSystem = fs;
+ }
+
+ public void store(String pathName, Storable value) throws IOException
+ {
+ store(new Path(pathName), value);
+ }
+
+ public void store(Path path, Storable obj) throws IOException
+ {
+ OutputStream os = hadoopFileSystem.create(path);
+ try
+ {
+ serializer.write(os, obj);
+ } finally
+ {
+ if (os != null)
+ {
+ os.close();
+ }
+ }
+ }
+
+ public <T> T recall(String pathName, Class<T> type) throws IOException
+ {
+ return recall(new Path(pathName), type);
+ }
+
+ public <T> T recall(Path path, Class<T> type) throws IOException
+ {
+ InputStream is = hadoopFileSystem.open(path);
+ try
+ {
+ return serializer.read(is, type);
+ } finally
+ {
+ if (is != null)
+ {
+ is.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/JavaSerializer.java b/src/main/java/org/apache/pirk/serialization/JavaSerializer.java
new file mode 100644
index 0000000..4228c19
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/JavaSerializer.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+public class JavaSerializer extends SerializationService
+{
+
+ public void write(OutputStream stream, Storable obj) throws IOException
+ {
+ ObjectOutputStream oos = new ObjectOutputStream(stream);
+ oos.writeObject(obj);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T read(InputStream stream, Class<T> type) throws IOException
+ {
+ ObjectInputStream oin = new ObjectInputStream(stream);
+ try
+ {
+ return (T) oin.readObject();
+ } catch (ClassNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/JsonSerializer.java b/src/main/java/org/apache/pirk/serialization/JsonSerializer.java
new file mode 100644
index 0000000..c33366d
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/JsonSerializer.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+//TODO: Waiting for Jackson adoption
+public class JsonSerializer extends SerializationService
+{
+
+ @Override
+ public void write(OutputStream w, Storable obj) throws IOException
+ {
+ throw new RuntimeException("Not yet implemented");
+ }
+
+ @Override
+ public <T> T read(InputStream stream, Class<T> type) throws IOException
+ {
+ throw new RuntimeException("Not yet implemented");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java b/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java
new file mode 100644
index 0000000..50d11c3
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java
@@ -0,0 +1,82 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class LocalFileSystemStore extends StorageService
+{
+
+ /**
+ * Creates a new storage service on the local file system using default Java serialization.
+ */
+ public LocalFileSystemStore()
+ {
+ super();
+ }
+
+ public LocalFileSystemStore(SerializationService serial)
+ {
+ super(serial);
+ }
+
+ public void store(String path, Storable obj) throws IOException
+ {
+ store(new File(path), obj);
+ }
+
+ public void store(File file, Storable obj) throws IOException
+ {
+ FileOutputStream fos = new FileOutputStream(file);
+ try
+ {
+ serializer.write(fos, obj);
+ } finally
+ {
+ if (fos != null)
+ {
+ fos.close();
+ }
+ }
+ }
+
+ public <T> T recall(String path, Class<T> type) throws IOException
+ {
+ return recall(new File(path), type);
+ }
+
+ public <T> T recall(File file, Class<T> type) throws IOException
+ {
+ FileInputStream fis = new FileInputStream(file);
+ try
+ {
+ return serializer.read(fis, type);
+ } finally
+ {
+ if (fis != null)
+ {
+ fis.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/SerializationService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/SerializationService.java b/src/main/java/org/apache/pirk/serialization/SerializationService.java
new file mode 100644
index 0000000..2764fc8
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/SerializationService.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/*
+ * Ability to read and write objects to/from a stream.
+ */
+public abstract class SerializationService
+{
+ public abstract <T> T read(InputStream stream, Class<T> type) throws IOException;
+
+ public abstract void write(OutputStream w, Storable obj) throws IOException;
+
+ public byte[] toBytes(Storable obj)
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ try
+ {
+ write(bos, obj);
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ return bos.toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/Storable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/Storable.java b/src/main/java/org/apache/pirk/serialization/Storable.java
new file mode 100644
index 0000000..d9e2fb3
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/Storable.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+// Marker interface
+public interface Storable
+{
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/StorageService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/serialization/StorageService.java b/src/main/java/org/apache/pirk/serialization/StorageService.java
new file mode 100644
index 0000000..775a313
--- /dev/null
+++ b/src/main/java/org/apache/pirk/serialization/StorageService.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.pirk.serialization;
+
+abstract class StorageService
+{
+ SerializationService serializer;
+
+ StorageService()
+ {
+ this.setSerializer(new JavaSerializer());
+ }
+
+ StorageService(SerializationService service)
+ {
+ this.setSerializer(service);
+ }
+
+ public void setSerializer(SerializationService service)
+ {
+ serializer = service;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
index 37cb43c..020d464 100644
--- a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
+++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
@@ -36,6 +36,7 @@ import org.apache.pirk.responder.wideskies.ResponderCLI;
import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool;
import org.apache.pirk.response.wideskies.Response;
import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
import org.apache.pirk.test.distributed.DistributedTestDriver;
import org.apache.pirk.test.utils.BaseTests;
import org.apache.pirk.test.utils.Inputs;
@@ -342,7 +343,7 @@ public class DistTestSuite
// Write the Querier object to a file
Path queryInputDirPath = new Path(queryInputDir);
- query.writeToHDFSFile(queryInputDirPath, fs);
+ new HadoopFileSystemStore(fs).store(queryInputDirPath, query);
fs.deleteOnExit(queryInputDirPath);
// Grab the original data and query schema properties to reset upon completion
@@ -413,7 +414,7 @@ public class DistTestSuite
// Perform decryption
// Reconstruct the necessary objects from the files
logger.info("Performing decryption; writing final results file");
- Response response = Response.readFromHDFSFile(new Path(outputFile), fs);
+ Response response = new HadoopFileSystemStore(fs).recall(outputFile, Response.class);
// Perform decryption and output the result file
DecryptResponse decryptResponse = new DecryptResponse(response, querier);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java
index aeda7dc..c33971e 100644
--- a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java
+++ b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java
@@ -34,6 +34,7 @@ import org.apache.pirk.query.wideskies.QueryUtils;
import org.apache.pirk.responder.wideskies.standalone.Responder;
import org.apache.pirk.response.wideskies.Response;
import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.LocalFileSystemStore;
import org.apache.pirk.utils.PIRException;
import org.apache.pirk.utils.SystemConfiguration;
import org.json.simple.JSONObject;
@@ -60,6 +61,7 @@ public class StandaloneQuery
ArrayList<QueryResponseJSON> results = null;
// Create the necessary files
+ LocalFileSystemStore storage = new LocalFileSystemStore();
String querySideOuputFilePrefix = "querySideOut";
File fileQuerier = File.createTempFile(querySideOuputFilePrefix + "-" + QuerierConst.QUERIER_FILETAG, ".txt");
File fileQuery = File.createTempFile(querySideOuputFilePrefix + "-" + QuerierConst.QUERY_FILETAG, ".txt");
@@ -98,11 +100,12 @@ public class StandaloneQuery
}
// Write necessary output files
- encryptQuery.writeOutputFiles(fileQuerier, fileQuery);
+ storage.store(fileQuerier, encryptQuery.getQuerier());
+ storage.store(fileQuery, encryptQuery.getQuery());
// Perform the PIR query and build the response elements
logger.info("Performing the PIR Query and constructing the response elements:");
- Query query = Query.readFromFile(fileQuery);
+ Query query = storage.recall(fileQuery, Query.class);
Responder pirResponder = new Responder(query);
logger.info("Query and Responder elements constructed");
for (JSONObject jsonData : dataElements)
@@ -123,14 +126,14 @@ public class StandaloneQuery
logger.info("Forming response from response elements; writing to a file");
pirResponder.setResponseElements();
Response responseOut = pirResponder.getResponse();
- responseOut.writeToFile(fileResponse);
+ storage.store(fileResponse, responseOut);
logger.info("Completed forming response from response elements and writing to a file");
// Perform decryption
// Reconstruct the necessary objects from the files
logger.info("Performing decryption; writing final results file");
- Response responseIn = Response.readFromFile(fileResponse);
- Querier querier = Querier.readFromFile(fileQuerier);
+ Response responseIn = storage.recall(fileResponse, Response.class);
+ Querier querier = storage.recall(fileQuerier, Querier.class);
// Perform decryption and output the result file
DecryptResponse decryptResponse = new DecryptResponse(responseIn, querier);