You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/10/28 23:10:01 UTC
[11/15] incubator-hawq git commit: HAWQ-45. PXF Package Namespace
change
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
deleted file mode 100644
index 0f8f908..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.Analyzer;
-import com.pivotal.pxf.api.AnalyzerStats;
-import com.pivotal.pxf.api.ReadAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.service.ReadBridge;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import com.pivotal.pxf.plugins.hdfs.utilities.PxfInputFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-/**
- * Analyzer class for HDFS data resources
- *
- * Given an HDFS data source (a file, directory, or wild card pattern)
- * return statistics about it (number of blocks, number of tuples, etc.)
- */
-public class HdfsAnalyzer extends Analyzer {
- private JobConf jobConf;
- private FileSystem fs;
- private Log Log;
-
- /**
- * Constructs an HdfsAnalyzer object.
- *
- * @param inputData all input parameters coming from the client
- * @throws IOException if HDFS file system cannot be retrieved
- */
- public HdfsAnalyzer(InputData inputData) throws IOException {
- super(inputData);
- Log = LogFactory.getLog(HdfsAnalyzer.class);
-
- jobConf = new JobConf(new Configuration(), HdfsAnalyzer.class);
- fs = FileSystem.get(jobConf);
- }
-
- /**
- * Collects a number of basic statistics based on an estimate. Statistics
- * are: number of records, number of hdfs blocks and hdfs block size.
- *
- * @param datapath path is a data source URI that can appear as a file
- * name, a directory name or a wildcard pattern
- * @return statistics in JSON format
- * @throws Exception if path is wrong, its metadata cannot be retrieved
- * from file system, or if scanning the first block
- * using the accessor failed
- */
- @Override
- public AnalyzerStats getEstimatedStats(String datapath) throws Exception {
- long blockSize = 0;
- long numberOfBlocks;
- Path path = new Path(HdfsUtilities.absoluteDataPath(datapath));
-
- ArrayList<InputSplit> splits = getSplits(path);
-
- for (InputSplit split : splits) {
- FileSplit fsp = (FileSplit) split;
- Path filePath = fsp.getPath();
- FileStatus fileStatus = fs.getFileStatus(filePath);
- if (fileStatus.isFile()) {
- blockSize = fileStatus.getBlockSize();
- break;
- }
- }
-
- // if no file is in path (only dirs), get default block size
- if (blockSize == 0) {
- blockSize = fs.getDefaultBlockSize(path);
- }
- numberOfBlocks = splits.size();
-
-
- long numberOfTuplesInBlock = getNumberOfTuplesInBlock(splits);
- AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks, numberOfTuplesInBlock * numberOfBlocks);
-
- //print files size to log when in debug level
- Log.debug(AnalyzerStats.dataToString(stats, path.toString()));
-
- return stats;
- }
-
- /**
- * Calculates the number of tuples in a split (block).
- * Reads one block from HDFS. Exception during reading will
- * filter upwards and handled in AnalyzerResource
- */
- private long getNumberOfTuplesInBlock(ArrayList<InputSplit> splits) throws Exception {
- long tuples = -1; /* default - if we are not able to read data */
- ReadAccessor accessor;
-
- if (splits.isEmpty()) {
- return 0;
- }
-
- /*
- * metadata information includes: file split's
- * start, length and hosts (locations).
- */
- FileSplit firstSplit = (FileSplit) splits.get(0);
- byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(firstSplit);
- inputData.setFragmentMetadata(fragmentMetadata);
- inputData.setDataSource(firstSplit.getPath().toUri().getPath());
- accessor = ReadBridge.getFileAccessor(inputData);
-
- if (accessor.openForRead()) {
- tuples = 0;
- while (accessor.readNextObject() != null) {
- tuples++;
- }
-
- accessor.closeForRead();
- }
-
- return tuples;
- }
-
- private ArrayList<InputSplit> getSplits(Path path) throws IOException {
- PxfInputFormat fformat = new PxfInputFormat();
- PxfInputFormat.setInputPaths(jobConf, path);
- InputSplit[] splits = fformat.getSplits(jobConf, 1);
- ArrayList<InputSplit> result = new ArrayList<InputSplit>();
-
- // remove empty splits
- if (splits != null) {
- for (InputSplit split : splits) {
- if (split.getLength() > 0) {
- result.add(split);
- }
- }
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
deleted file mode 100644
index 4a43c5f..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.ReadAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-
-/**
- * Base class for enforcing the complete access of a file in one accessor.
- * Since we are not accessing the file using the splittable API, but instead are
- * using the "simple" stream API, it means that we cannot fetch different parts
- * (splits) of the file in different segments. Instead each file access brings
- * the complete file. And, if several segments would access the same file, then
- * each one will return the whole file and we will observe in the query result,
- * each record appearing number_of_segments times. To avoid this we will only
- * have one segment (segment 0) working for this case - enforced with
- * isWorkingSegment() method. Naturally this is the less recommended working
- * mode since we are not making use of segment parallelism. HDFS accessors for
- * a specific file type should inherit from this class only if the file they are
- * reading does not support splitting: a protocol-buffer file, regular file, ...
- */
-public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAccessor {
- private Configuration conf = null;
- protected InputStream inp = null;
- private FileSplit fileSplit = null;
-
- /**
- * Constructs a HdfsAtomicDataAccessor object.
- *
- * @param input all input parameters coming from the client
- */
- public HdfsAtomicDataAccessor(InputData input) {
- // 0. Hold the configuration data
- super(input);
-
- // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
- conf = new Configuration();
-
- fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
- }
-
- /**
- * Opens the file using the non-splittable API for HADOOP HDFS file access
- * This means that instead of using a FileInputFormat for access, we use a
- * Java stream.
- *
- * @return true for successful file open, false otherwise
- */
- @Override
- public boolean openForRead() throws Exception {
- if (!isWorkingSegment()) {
- return false;
- }
-
- // input data stream
- FileSystem fs = FileSystem.get(URI.create(inputData.getDataSource()), conf); // FileSystem.get actually returns an FSDataInputStream
- inp = fs.open(new Path(inputData.getDataSource()));
-
- return (inp != null);
- }
-
- /**
- * Fetches one record from the file.
- *
- * @return a {@link OneRow} record as a Java object. Returns null if none.
- */
- @Override
- public OneRow readNextObject() throws IOException {
- if (!isWorkingSegment()) {
- return null;
- }
-
- return new OneRow(null, new Object());
- }
-
- /**
- * Closes the access stream when finished reading the file
- */
- @Override
- public void closeForRead() throws Exception {
- if (!isWorkingSegment()) {
- return;
- }
-
- if (inp != null) {
- inp.close();
- }
- }
-
- /*
- * Making sure that only the segment that got assigned the first data
- * fragment will read the (whole) file.
- */
- private boolean isWorkingSegment() {
- return (fileSplit.getStart() == 0);
- }
-
- @Override
- public boolean isThreadSafe() {
- return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
- inputData.getUserProperty("COMPRESSION_CODEC"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
deleted file mode 100644
index 1bf5aab..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.Fragment;
-import com.pivotal.pxf.api.Fragmenter;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import com.pivotal.pxf.plugins.hdfs.utilities.PxfInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Fragmenter class for HDFS data resources.
- *
- * Given an HDFS data source (a file, directory, or wild card pattern) divide
- * the data into fragments and return a list of them along with a list of
- * host:port locations for each.
- */
-public class HdfsDataFragmenter extends Fragmenter {
- private JobConf jobConf;
-
- /**
- * Constructs an HdfsDataFragmenter object.
- *
- * @param md all input parameters coming from the client
- */
- public HdfsDataFragmenter(InputData md) {
- super(md);
-
- jobConf = new JobConf(new Configuration(), HdfsDataFragmenter.class);
- }
-
- /**
- * Gets the fragments for a data source URI that can appear as a file name,
- * a directory name or a wildcard. Returns the data fragments in JSON
- * format.
- */
- @Override
- public List<Fragment> getFragments() throws Exception {
- String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
- InputSplit[] splits = getSplits(new Path(absoluteDataPath));
-
- for (InputSplit split : splits != null ? splits : new InputSplit[] {}) {
- FileSplit fsp = (FileSplit) split;
-
- /*
- * HD-2547: If the file is empty, an empty split is returned: no
- * locations and no length.
- */
- if (fsp.getLength() <= 0) {
- continue;
- }
-
- String filepath = fsp.getPath().toUri().getPath();
- String[] hosts = fsp.getLocations();
-
- /*
- * metadata information includes: file split's start, length and
- * hosts (locations).
- */
- byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp);
- Fragment fragment = new Fragment(filepath, hosts, fragmentMetadata);
- fragments.add(fragment);
- }
-
- return fragments;
- }
-
- private InputSplit[] getSplits(Path path) throws IOException {
- PxfInputFormat format = new PxfInputFormat();
- PxfInputFormat.setInputPaths(jobConf, path);
- return format.getSplits(jobConf, 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
deleted file mode 100644
index 744342d..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.ReadAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.*;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.ListIterator;
-
-/**
- * Accessor for accessing a splittable HDFS data sources. HDFS will divide the
- * file into splits based on an internal decision (by default, the block size is
- * also the split size).
- *
- * Accessors that require such base functionality should extend this class.
- */
-public abstract class HdfsSplittableDataAccessor extends Plugin implements
- ReadAccessor {
- protected Configuration conf = null;
- protected RecordReader<Object, Object> reader = null;
- protected InputFormat<?, ?> inputFormat = null;
- protected ListIterator<InputSplit> iter = null;
- protected JobConf jobConf = null;
- protected Object key, data;
-
- /**
- * Constructs an HdfsSplittableDataAccessor
- *
- * @param input all input parameters coming from the client request
- * @param inFormat the HDFS {@link InputFormat} the caller wants to use
- */
- public HdfsSplittableDataAccessor(InputData input,
- InputFormat<?, ?> inFormat) {
- super(input);
- inputFormat = inFormat;
-
- // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
- conf = new Configuration();
-
- // 2. variable required for the splits iteration logic
- jobConf = new JobConf(conf, HdfsSplittableDataAccessor.class);
- }
-
- /**
- * Fetches the requested fragment (file split) for the current client
- * request, and sets a record reader for the job.
- *
- * @return true if succeeded, false if no more splits to be read
- */
- @Override
- public boolean openForRead() throws Exception {
- LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
- FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
- requestSplits.add(fileSplit);
-
- // Initialize record reader based on current split
- iter = requestSplits.listIterator(0);
- return getNextSplit();
- }
-
- /**
- * Specialized accessors will override this method and implement their own
- * recordReader. For example, a plain delimited text accessor may want to
- * return a LineRecordReader.
- *
- * @param jobConf the hadoop jobconf to use for the selected InputFormat
- * @param split the input split to be read by the accessor
- * @return a recordreader to be used for reading the data records of the
- * split
- * @throws IOException if recordreader could not be created
- */
- abstract protected Object getReader(JobConf jobConf, InputSplit split)
- throws IOException;
-
- /**
- * Sets the current split and initializes a RecordReader who feeds from the
- * split
- *
- * @return true if there is a split to read
- * @throws IOException if record reader could not be created
- */
- @SuppressWarnings(value = "unchecked")
- protected boolean getNextSplit() throws IOException {
- if (!iter.hasNext()) {
- return false;
- }
-
- InputSplit currSplit = iter.next();
- reader = (RecordReader<Object, Object>) getReader(jobConf, currSplit);
- key = reader.createKey();
- data = reader.createValue();
- return true;
- }
-
- /**
- * Fetches one record from the file. The record is returned as a Java
- * object.
- */
- @Override
- public OneRow readNextObject() throws IOException {
- // if there is one more record in the current split
- if (!reader.next(key, data)) {
- // the current split is exhausted. try to move to the next split
- if (getNextSplit()) {
- // read the first record of the new split
- if (!reader.next(key, data)) {
- // make sure we return nulls
- return null;
- }
- } else {
- // make sure we return nulls
- return null;
- }
- }
-
- /*
- * if neither condition was met, it means we already read all the
- * records in all the splits, and in this call record variable was not
- * set, so we return null and thus we are signaling end of records
- * sequence
- */
- return new OneRow(key, data);
- }
-
- /**
- * When user finished reading the file, it closes the RecordReader
- */
- @Override
- public void closeForRead() throws Exception {
- if (reader != null) {
- reader.close();
- }
- }
-
- @Override
- public boolean isThreadSafe() {
- return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
- inputData.getUserProperty("COMPRESSION_CODEC"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
deleted file mode 100644
index 32002a1..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.WriteAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.*;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * A PXF Accessor for reading delimited plain text records.
- */
-public class LineBreakAccessor extends HdfsSplittableDataAccessor implements
- WriteAccessor {
- private DataOutputStream dos;
- private FSDataOutputStream fsdos;
- private Configuration conf;
- private FileSystem fs;
- private Path file;
- private static Log Log = LogFactory.getLog(LineBreakAccessor.class);
-
- /**
- * Constructs a LineReaderAccessor.
- *
- * @param input all input parameters coming from the client request
- */
- public LineBreakAccessor(InputData input) {
- super(input, new TextInputFormat());
- ((TextInputFormat) inputFormat).configure(jobConf);
- }
-
- @Override
- protected Object getReader(JobConf jobConf, InputSplit split)
- throws IOException {
- return new ChunkRecordReader(jobConf, (FileSplit) split);
- }
-
- /**
- * Opens file for write.
- */
- @Override
- public boolean openForWrite() throws Exception {
-
- String fileName = inputData.getDataSource();
- String compressCodec = inputData.getUserProperty("COMPRESSION_CODEC");
- CompressionCodec codec = null;
-
- conf = new Configuration();
- fs = FileSystem.get(conf);
-
- // get compression codec
- if (compressCodec != null) {
- codec = HdfsUtilities.getCodec(conf, compressCodec);
- String extension = codec.getDefaultExtension();
- fileName += extension;
- }
-
- file = new Path(fileName);
-
- if (fs.exists(file)) {
- throw new IOException("file " + file.toString()
- + " already exists, can't write data");
- }
- org.apache.hadoop.fs.Path parent = file.getParent();
- if (!fs.exists(parent)) {
- fs.mkdirs(parent);
- Log.debug("Created new dir " + parent.toString());
- }
-
- // create output stream - do not allow overwriting existing file
- createOutputStream(file, codec);
-
- return true;
- }
-
- /*
- * Creates output stream from given file. If compression codec is provided,
- * wrap it around stream.
- */
- private void createOutputStream(Path file, CompressionCodec codec)
- throws IOException {
- fsdos = fs.create(file, false);
- if (codec != null) {
- dos = new DataOutputStream(codec.createOutputStream(fsdos));
- } else {
- dos = fsdos;
- }
-
- }
-
- /**
- * Writes row into stream.
- */
- @Override
- public boolean writeNextObject(OneRow onerow) throws Exception {
- dos.write((byte[]) onerow.getData());
- return true;
- }
-
- /**
- * Closes the output stream after done writing.
- */
- @Override
- public void closeForWrite() throws Exception {
- if ((dos != null) && (fsdos != null)) {
- Log.debug("Closing writing stream for path " + file);
- dos.flush();
- /*
- * From release 0.21.0 sync() is deprecated in favor of hflush(),
- * which only guarantees that new readers will see all data written
- * to that point, and hsync(), which makes a stronger guarantee that
- * the operating system has flushed the data to disk (like POSIX
- * fsync), although data may still be in the disk cache.
- */
- fsdos.hsync();
- dos.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
deleted file mode 100644
index 13880b8..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.utilities.InputData;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-/**
- * A (atomic) PXF Accessor for reading \n delimited files with quoted
- * field delimiter, line delimiter, and quotes. This accessor supports
- * multi-line records, that are read from a single source (non-parallel).
- */
-public class QuotedLineBreakAccessor extends HdfsAtomicDataAccessor {
- private BufferedReader reader;
-
- /**
- * Constructs a QuotedLineBreakAccessor.
- *
- * @param input all input parameters coming from the client request
- */
- public QuotedLineBreakAccessor(InputData input) {
- super(input);
- }
-
- @Override
- public boolean openForRead() throws Exception {
- if (!super.openForRead()) {
- return false;
- }
- reader = new BufferedReader(new InputStreamReader(inp));
- return true;
- }
-
- /**
- * Fetches one record (maybe partial) from the file. The record is returned as a Java object.
- */
- @Override
- public OneRow readNextObject() throws IOException {
- if (super.readNextObject() == null) /* check if working segment */ {
- return null;
- }
-
- String next_line = reader.readLine();
- if (next_line == null) /* EOF */ {
- return null;
- }
-
- return new OneRow(null, next_line);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
deleted file mode 100644
index 5f3f3dd..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
+++ /dev/null
@@ -1,215 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.WriteAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.*;
-
-import java.io.IOException;
-import java.util.EnumSet;
-
-/**
- * A PXF Accessor for reading and writing Sequence File records
- */
-public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
- WriteAccessor {
-
- private Configuration conf;
- private FileContext fc;
- private Path file;
- private CompressionCodec codec;
- private CompressionType compressionType;
- private SequenceFile.Writer writer;
- private LongWritable defaultKey; // used when recordkey is not defined
-
- private static Log Log = LogFactory.getLog(SequenceFileAccessor.class);;
-
- /**
- * Constructs a SequenceFileAccessor.
- *
- * @param input all input parameters coming from the client request
- */
- public SequenceFileAccessor(InputData input) {
- super(input, new SequenceFileInputFormat<Writable, Writable>());
- }
-
- /**
- * Overrides virtual method to create specialized record reader
- */
- @Override
- protected Object getReader(JobConf jobConf, InputSplit split)
- throws IOException {
- return new SequenceFileRecordReader<Object, Object>(jobConf, (FileSplit) split);
- }
-
- @Override
- public boolean openForWrite() throws Exception {
- FileSystem fs;
- Path parent;
- String fileName = inputData.getDataSource();
- conf = new Configuration();
-
- getCompressionCodec(inputData);
- fileName = updateFileExtension(fileName, codec);
-
- // construct the output stream
- file = new Path(fileName);
- fs = file.getFileSystem(conf);
- fc = FileContext.getFileContext();
- defaultKey = new LongWritable(inputData.getSegmentId());
-
- if (fs.exists(file)) {
- throw new IOException("file " + file
- + " already exists, can't write data");
- }
- parent = file.getParent();
- if (!fs.exists(parent)) {
- fs.mkdirs(parent);
- Log.debug("Created new dir " + parent);
- }
-
- writer = null;
- return true;
- }
-
- /**
- * Compression: based on compression codec and compression type (default
- * value RECORD). If there is no codec, compression type is ignored, and
- * NONE is used.
- *
- * @param inputData - container where compression codec and type are held
- */
- private void getCompressionCodec(InputData inputData) {
-
- String userCompressCodec = inputData.getUserProperty("COMPRESSION_CODEC");
- String userCompressType = inputData.getUserProperty("COMPRESSION_TYPE");
- String parsedCompressType = parseCompressionType(userCompressType);
-
- compressionType = SequenceFile.CompressionType.NONE;
- codec = null;
- if (userCompressCodec != null) {
- codec = HdfsUtilities.getCodec(conf, userCompressCodec);
-
- try {
- compressionType = CompressionType.valueOf(parsedCompressType);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(
- "Illegal value for compression type " + "'"
- + parsedCompressType + "'");
- }
- if (compressionType == null) {
- throw new IllegalArgumentException(
- "Compression type must be defined");
- }
-
- Log.debug("Compression ON: " + "compression codec: "
- + userCompressCodec + ", compression type: "
- + compressionType);
- }
- }
-
- /*
- * Parses compression type for sequence file. If null, default to RECORD.
- * Allowed values: RECORD, BLOCK.
- */
- private String parseCompressionType(String compressType) {
- final String COMPRESSION_TYPE_RECORD = "RECORD";
- final String COMPRESSION_TYPE_BLOCK = "BLOCK";
- final String COMPRESSION_TYPE_NONE = "NONE";
-
- if (compressType == null) {
- return COMPRESSION_TYPE_RECORD;
- }
-
- if (compressType.equalsIgnoreCase(COMPRESSION_TYPE_NONE)) {
- throw new IllegalArgumentException(
- "Illegal compression type 'NONE'. "
- + "For disabling compression remove COMPRESSION_CODEC parameter.");
- }
-
- if (!compressType.equalsIgnoreCase(COMPRESSION_TYPE_RECORD)
- && !compressType.equalsIgnoreCase(COMPRESSION_TYPE_BLOCK)) {
- throw new IllegalArgumentException("Illegal compression type '"
- + compressType + "'");
- }
-
- return compressType.toUpperCase();
- }
-
- /*
- * Returns fileName with the codec's file extension appended
- */
- private String updateFileExtension(String fileName, CompressionCodec codec) {
-
- if (codec != null) {
- fileName += codec.getDefaultExtension();
- }
- Log.debug("File name for write: " + fileName);
- return fileName;
- }
-
- @Override
- public boolean writeNextObject(OneRow onerow) throws IOException {
- Writable value = (Writable) onerow.getData();
- Writable key = (Writable) onerow.getKey();
-
- // init writer on first approach here, based on onerow.getData type
- // TODO: verify data is serializable.
- if (writer == null) {
- Class<? extends Writable> valueClass = value.getClass();
- Class<? extends Writable> keyClass = (key == null) ? LongWritable.class
- : key.getClass();
- // create writer - do not allow overwriting existing file
- writer = SequenceFile.createWriter(fc, conf, file, keyClass,
- valueClass, compressionType, codec,
- new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE));
- }
-
- try {
- writer.append((key == null) ? defaultKey : key, value);
- } catch (IOException e) {
- Log.error("Failed to write data to file: " + e.getMessage());
- return false;
- }
-
- return true;
- }
-
- @Override
- public void closeForWrite() throws Exception {
- if (writer != null) {
- writer.sync();
- /*
- * From release 0.21.0 sync() is deprecated in favor of hflush(),
- * which only guarantees that new readers will see all data written
- * to that point, and hsync(), which makes a stronger guarantee that
- * the operating system has flushed the data to disk (like POSIX
- * fsync), although data may still be in the disk cache.
- */
- writer.hsync();
- writer.close();
- }
- }
-
- public CompressionType getCompressionType() {
- return compressionType;
- }
-
- public CompressionCodec getCodec() {
- return codec;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
deleted file mode 100644
index 7ace4c8..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.ReadResolver;
-import com.pivotal.pxf.api.WriteResolver;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.ChunkWritable;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.pivotal.pxf.api.io.DataType.VARCHAR;
-
-/**
- * StringPassResolver handles "deserialization" and serialization of
- * String records. StringPassResolver implements IReadResolver and
- * IWriteResolver interfaces. Returns strings as-is.
- */
-public class StringPassResolver extends Plugin implements ReadResolver, WriteResolver {
- // for write
- private OneRow oneRow;
-
- /**
- * Constructs a StringPassResolver.
- *
- * @param inputData input all input parameters coming from the client request
- */
- public StringPassResolver(InputData inputData) {
- super(inputData);
- oneRow = new OneRow();
- this.inputData = inputData;
- }
-
- /**
- * Returns a list of the fields of one record.
- * Each record field is represented by a {@link OneField} item.
- * OneField item contains two fields: an integer representing the field type and a Java
- * Object representing the field value.
- */
- @Override
- public List<OneField> getFields(OneRow onerow) {
- /*
- * This call forces a whole text line into a single varchar field and replaces
- * the proper field separation code can be found in previous revisions. The reasons
- * for doing so as this point are:
- * 1. performance
- * 2. desire to not replicate text parsing logic from the backend into java
- */
- List<OneField> record = new LinkedList<OneField>();
- Object data = onerow.getData();
- if (data instanceof ChunkWritable) {
- record.add(new OneField(DataType.BYTEA.getOID(), ((ChunkWritable)data).box));
- }
- else {
- record.add(new OneField(VARCHAR.getOID(), data));
- }
- return record;
- }
-
- /**
- * Creates a OneRow object from the singleton list.
- */
- @Override
- public OneRow setFields(List<OneField> record) throws Exception {
- if (((byte[]) record.get(0).val).length == 0) {
- return null;
- }
-
- oneRow.setData(record.get(0).val);
- return oneRow;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
deleted file mode 100644
index e9df907..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
+++ /dev/null
@@ -1,220 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.*;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.utilities.RecordkeyAdapter;
-import com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException;
-import com.pivotal.pxf.service.utilities.Utilities;
-
-import static com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Writable;
-
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.pivotal.pxf.api.io.DataType.*;
-
-/**
- * WritableResolver handles serialization and deserialization of records
- * that were serialized using Hadoop's Writable serialization framework.
- *
- * A field named 'recordkey' is treated as a key of the given row, and not as
- * part of the data schema. See {@link RecordkeyAdapter}.
- */
-public class WritableResolver extends Plugin implements ReadResolver, WriteResolver {
- private static final int RECORDKEY_UNDEFINED = -1;
- private static final Log LOG = LogFactory.getLog(WritableResolver.class);
- private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter();
- private int recordkeyIndex;
- // reflection fields
- private Object userObject = null;
- private Field[] fields = null;
-
-
- /**
- * Constructs a WritableResolver.
- *
- * @param input all input parameters coming from the client
- * @throws Exception if schema file is missing, cannot be found in
- * classpath or fails to instantiate
- */
- public WritableResolver(InputData input) throws Exception {
- super(input);
-
- String schemaName = inputData.getUserProperty("DATA-SCHEMA");
-
- /** Testing that the schema name was supplied by the user - schema is an optional property. */
- if (schemaName == null) {
- throw new DataSchemaException(SCHEMA_NOT_INDICATED, this.getClass().getName());
- }
-
- /** Testing that the schema resource exists. */
- if (!isSchemaOnClasspath(schemaName)) {
- throw new DataSchemaException(SCHEMA_NOT_ON_CLASSPATH, schemaName);
- }
-
- userObject = Utilities.createAnyInstance(schemaName);
- fields = userObject.getClass().getDeclaredFields();
- recordkeyIndex = (inputData.getRecordkeyColumn() == null)
- ? RECORDKEY_UNDEFINED
- : inputData.getRecordkeyColumn().columnIndex();
-
- // fields details:
- if (LOG.isDebugEnabled()) {
- for (int i = 0; i < fields.length; i++) {
- Field field = fields[i];
- String javaType = field.getType().getName();
- boolean isPrivate = Modifier.isPrivate(field.getModifiers());
-
- LOG.debug("Field #" + i + ", name: " + field.getName() +
- " type: " + javaType + ", " +
- (isArray(javaType) ? "Array" : "Primitive") + ", " +
- (isPrivate ? "Private" : "accessible") + " field");
- }
- }
- }
-
- private boolean isArray(String javaType) {
- return (javaType.startsWith("[") && !"[B".equals(javaType));
- }
-
- @Override
- public List<OneField> getFields(OneRow onerow) throws Exception {
- userObject = onerow.getData();
- List<OneField> record = new LinkedList<OneField>();
-
- int currentIdx = 0;
- for (Field field : fields) {
- if (currentIdx == recordkeyIndex) {
- currentIdx += recordkeyAdapter.appendRecordkeyField(record, inputData, onerow);
- }
-
- if (Modifier.isPrivate(field.getModifiers())) {
- continue;
- }
-
- currentIdx += populateRecord(record, field);
- }
-
- return record;
- }
-
- int setArrayField(List<OneField> record, int dataType, Field reflectedField) throws IllegalAccessException {
- Object array = reflectedField.get(userObject);
- int length = Array.getLength(array);
- for (int j = 0; j < length; j++) {
- record.add(new OneField(dataType, Array.get(array, j)));
- }
- return length;
- }
-
- /*
- * Given a java Object type, convert it to the corresponding output field
- * type.
- */
- private DataType convertJavaToGPDBType(String type) {
- if ("boolean".equals(type) || "[Z".equals(type)) {
- return BOOLEAN;
- }
- if ("int".equals(type) || "[I".equals(type)) {
- return INTEGER;
- }
- if ("double".equals(type) || "[D".equals(type)) {
- return FLOAT8;
- }
- if ("java.lang.String".equals(type) || "[Ljava.lang.String;".equals(type)) {
- return TEXT;
- }
- if ("float".equals(type) || "[F".equals(type)) {
- return REAL;
- }
- if ("long".equals(type) || "[J".equals(type)) {
- return BIGINT;
- }
- if ("[B".equals(type)) {
- return BYTEA;
- }
- if ("short".equals(type) || "[S".equals(type)) {
- return SMALLINT;
- }
- throw new UnsupportedTypeException("Type " + type + " is not supported by GPDBWritable");
- }
-
- int populateRecord(List<OneField> record, Field field) throws BadRecordException {
- String javaType = field.getType().getName();
- try {
- DataType dataType = convertJavaToGPDBType(javaType);
- if (isArray(javaType)) {
- return setArrayField(record, dataType.getOID(), field);
- }
- record.add(new OneField(dataType.getOID(), field.get(userObject)));
- return 1;
- } catch (IllegalAccessException ex) {
- throw new BadRecordException(ex);
- }
- }
-
- /**
- * Sets customWritable fields and creates a OneRow object.
- */
- @Override
- public OneRow setFields(List<OneField> record) throws Exception {
- Writable key = null;
-
- int colIdx = 0;
- for (Field field : fields) {
- /*
- * extract recordkey based on the column descriptor type
- * and add to OneRow.key
- */
- if (colIdx == recordkeyIndex) {
- key = recordkeyAdapter.convertKeyValue(record.get(colIdx).val);
- colIdx++;
- }
-
- if (Modifier.isPrivate(field.getModifiers())) {
- continue;
- }
-
- String javaType = field.getType().getName();
- convertJavaToGPDBType(javaType);
- if (isArray(javaType)) {
- Object value = field.get(userObject);
- int length = Array.getLength(value);
- for (int j = 0; j < length; j++, colIdx++) {
- Array.set(value, j, record.get(colIdx).val);
- }
- } else {
- field.set(userObject, record.get(colIdx).val);
- colIdx++;
- }
- }
-
- return new OneRow(key, userObject);
- }
-
- /*
- * Tests for the case schema resource is a file like avro_schema.avsc
- * or for the case schema resource is a Java class. in which case we try to reflect the class name.
- */
- private boolean isSchemaOnClasspath(String resource) {
- if (this.getClass().getClassLoader().getResource("/" + resource) != null) {
- return true;
- }
-
- try {
- Class.forName(resource);
- return true;
- } catch (ClassNotFoundException e) {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java
deleted file mode 100644
index db0c5ea..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-/**
- * Thrown when there is a data schema problem detected by any plugin that
- * requires a schema.
- * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_ON_CLASSPATH} when the specified schema is missing from the CLASSPATH.
- * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_INDICATED} when a schema was required but was not specified in the pxf uri.
- */
-public class DataSchemaException extends RuntimeException {
- public static enum MessageFmt {
- SCHEMA_NOT_INDICATED("%s requires a data schema to be specified in the "+
- "pxf uri, but none was found. Please supply it" +
- "using the DATA-SCHEMA option "),
- SCHEMA_NOT_ON_CLASSPATH("schema resource \"%s\" is not located on the classpath");
-
- String format;
-
- MessageFmt(String format) {
- this.format = format;
- }
-
- public String getFormat() {
- return format;
- }
- }
-
- private MessageFmt msgFormat;
-
- /**
- * Constructs a DataSchemaException.
- *
- * @param msgFormat the message format
- * @param msgArgs the message arguments
- */
- public DataSchemaException(MessageFmt msgFormat, String... msgArgs) {
- super(String.format(msgFormat.getFormat(), (Object[]) msgArgs));
- this.msgFormat = msgFormat;
- }
-
- public MessageFmt getMsgFormat() {
- return msgFormat;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
deleted file mode 100644
index c7fe103..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
+++ /dev/null
@@ -1,231 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import com.pivotal.pxf.service.utilities.Utilities;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.utilities.InputData;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import java.io.*;
-import java.util.List;
-
-/**
- * HdfsUtilities class exposes helper methods for PXF classes.
- */
-public class HdfsUtilities {
- private static Log Log = LogFactory.getLog(HdfsUtilities.class);
- private static Configuration config = new Configuration();
- private static CompressionCodecFactory factory = new CompressionCodecFactory(
- config);
-
- /**
- * Hdfs data sources are absolute data paths. Method ensures that dataSource
- * begins with '/'.
- *
- * @param dataSource The HDFS path to a file or directory of interest.
- * Retrieved from the client request.
- * @return an absolute data path
- */
- public static String absoluteDataPath(String dataSource) {
- return (dataSource.charAt(0) == '/') ? dataSource : "/" + dataSource;
- }
-
- /*
- * Helper routine to get a compression codec class
- */
- private static Class<? extends CompressionCodec> getCodecClass(Configuration conf,
- String name) {
-
- Class<? extends CompressionCodec> codecClass;
- try {
- codecClass = conf.getClassByName(name).asSubclass(
- CompressionCodec.class);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("Compression codec " + name
- + " was not found.", e);
- }
- return codecClass;
- }
-
- /**
- * Helper routine to get compression codec through reflection.
- *
- * @param conf configuration used for reflection
- * @param name codec name
- * @return generated CompressionCodec
- */
- public static CompressionCodec getCodec(Configuration conf, String name) {
- return ReflectionUtils.newInstance(getCodecClass(conf, name), conf);
- }
-
- /**
- * Helper routine to get compression codec class by path (file suffix).
- *
- * @param path path of file to get codec for
- * @return matching codec class for the path. null if no codec is needed.
- */
- private static Class<? extends CompressionCodec> getCodecClassByPath(String path) {
-
- Class<? extends CompressionCodec> codecClass = null;
- CompressionCodec codec = factory.getCodec(new Path(path));
- if (codec != null) {
- codecClass = codec.getClass();
- }
- Log.debug((codecClass == null ? "No codec" : "Codec " + codecClass)
- + " was found for file " + path);
- return codecClass;
- }
-
- /**
- * Returns true if the needed codec is splittable. If no codec is needed
- * returns true as well.
- *
- * @param path path of the file to be read
- * @return if the codec needed for reading the specified path is splittable.
- */
- public static boolean isSplittableCodec(Path path) {
-
- final CompressionCodec codec = factory.getCodec(path);
- if (null == codec) {
- return true;
- }
-
- return codec instanceof SplittableCompressionCodec;
- }
-
- /**
- * Checks if requests should be handle in a single thread or not.
- *
- * @param dataDir hdfs path to the data source
- * @param compCodec the fully qualified name of the compression codec
- * @return if the request can be run in multi-threaded mode.
- */
- public static boolean isThreadSafe(String dataDir, String compCodec) {
-
- Class<? extends CompressionCodec> codecClass = (compCodec != null) ? HdfsUtilities.getCodecClass(
- config, compCodec) : HdfsUtilities.getCodecClassByPath(dataDir);
- /* bzip2 codec is not thread safe */
- return (codecClass == null || !BZip2Codec.class.isAssignableFrom(codecClass));
- }
-
- /**
- * Prepares byte serialization of a file split information (start, length,
- * hosts) using {@link ObjectOutputStream}.
- *
- * @param fsp file split to be serialized
- * @return byte serialization of fsp
- * @throws IOException if I/O errors occur while writing to the underlying
- * stream
- */
- public static byte[] prepareFragmentMetadata(FileSplit fsp)
- throws IOException {
- ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
- ObjectOutputStream objectStream = new ObjectOutputStream(
- byteArrayStream);
- objectStream.writeLong(fsp.getStart());
- objectStream.writeLong(fsp.getLength());
- objectStream.writeObject(fsp.getLocations());
-
- return byteArrayStream.toByteArray();
- }
-
- /**
- * Parses fragment metadata and return matching {@link FileSplit}.
- *
- * @param inputData request input data
- * @return FileSplit with fragment metadata
- */
- public static FileSplit parseFragmentMetadata(InputData inputData) {
- try {
- byte[] serializedLocation = inputData.getFragmentMetadata();
- if (serializedLocation == null) {
- throw new IllegalArgumentException(
- "Missing fragment location information");
- }
-
- ByteArrayInputStream bytesStream = new ByteArrayInputStream(
- serializedLocation);
- ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
-
- long start = objectStream.readLong();
- long end = objectStream.readLong();
-
- String[] hosts = (String[]) objectStream.readObject();
-
- FileSplit fileSplit = new FileSplit(new Path(
- inputData.getDataSource()), start, end, hosts);
-
- Log.debug("parsed file split: path " + inputData.getDataSource()
- + ", start " + start + ", end " + end + ", hosts "
- + ArrayUtils.toString(hosts));
-
- return fileSplit;
-
- } catch (Exception e) {
- throw new RuntimeException(
- "Exception while reading expected fragment metadata", e);
- }
- }
-
- /**
- * Accessing the Avro file through the "unsplittable" API just to get the
- * schema. The splittable API (AvroInputFormat) which is the one we will be
- * using to fetch the records, does not support getting the Avro schema yet.
- *
- * @param conf Hadoop configuration
- * @param dataSource Avro file (i.e fileName.avro) path
- * @return the Avro schema
- * @throws IOException if I/O error occured while accessing Avro schema file
- */
- public static Schema getAvroSchema(Configuration conf, String dataSource)
- throws IOException {
- FsInput inStream = new FsInput(new Path(dataSource), conf);
- DatumReader<GenericRecord> dummyReader = new GenericDatumReader<>();
- DataFileReader<GenericRecord> dummyFileReader = new DataFileReader<>(
- inStream, dummyReader);
- Schema schema = dummyFileReader.getSchema();
- dummyFileReader.close();
- return schema;
- }
-
- /**
- * Returns string serialization of list of fields. Fields of binary type
- * (BYTEA) are converted to octal representation to make sure they will be
- * relayed properly to the DB.
- *
- * @param complexRecord list of fields to be stringified
- * @param delimiter delimiter between fields
- * @return string of serialized fields using delimiter
- */
- public static String toString(List<OneField> complexRecord, String delimiter) {
- StringBuilder buff = new StringBuilder();
- String delim = ""; // first iteration has no delimiter
- for (OneField complex : complexRecord) {
- if (complex.type == DataType.BYTEA.getOID()) {
- /** Serialize byte array as string */
- buff.append(delim);
- Utilities.byteArrayToOctalString((byte[]) complex.val, buff);
- } else {
- buff.append(delim).append(complex.val);
- }
- delim = delimiter;
- }
- return buff.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java
deleted file mode 100644
index 958495c..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.*;
-
-import java.io.IOException;
-
-/**
- * PxfInputFormat is not intended to read a specific format, hence it implements
- * a dummy getRecordReader Instead, its purpose is to apply
- * FileInputFormat.getSplits from one point in PXF and get the splits which are
- * valid for the actual InputFormats, since all of them we use inherit
- * FileInputFormat but do not override getSplits.
- */
-public class PxfInputFormat extends FileInputFormat {
-
- @Override
- public RecordReader getRecordReader(InputSplit split,
- JobConf conf,
- Reporter reporter) throws IOException {
- throw new UnsupportedOperationException("PxfInputFormat should not be used for reading data, but only for obtaining the splits of a file");
- }
-
- /*
- * Return true if this file can be split.
- */
- @Override
- protected boolean isSplitable(FileSystem fs, Path filename) {
- return HdfsUtilities.isSplittableCodec(filename);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
deleted file mode 100644
index a88ade0..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
+++ /dev/null
@@ -1,265 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.utilities.ColumnDescriptor;
-import com.pivotal.pxf.api.utilities.InputData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.*;
-
-import java.util.List;
-
-/**
- * Adapter used for adding a recordkey field to the records output {@code List<OneField>}.
- */
-public class RecordkeyAdapter {
- private Log Log;
-
- /*
- * We need to transform Record keys to java primitive types.
- * Since the type of the key is the same throughout the file we do the type resolution
- * in the first call (for the first record) and then use a "Java variation on Function pointer"
- * to do the extraction for the rest of the records.
- */
- private interface ValExtractor {
- public Object get(Object key);
- }
-
- private ValExtractor extractor = null;
-
- private interface ValConverter {
- public Writable get(Object key);
- }
-
- private ValConverter converter = null;
-
- /**
- * Constructs a RecordkeyAdapter.
- */
- public RecordkeyAdapter() {
- Log = LogFactory.getLog(RecordkeyAdapter.class);
- }
-
- /**
- * Adds the recordkey to the end of the passed in recFields list.
- * <p>
- * This method also verifies cases in which record keys are not supported
- * by the underlying source type, and therefore "illegally" requested.
- *
- * @param recFields existing list of record (non-key) fields and their values.
- * @param input all input parameters coming from the client request
- * @param onerow a row object which is used here in order to find out if
- * the given type supports recordkeys or not.
- * @return 0 if record key not needed, or 1 if record key was appended
- * @throws NoSuchFieldException when the given record type does not support
- * recordkeys
- */
- public int appendRecordkeyField(List<OneField> recFields,
- InputData input,
- OneRow onerow) throws NoSuchFieldException {
-
- /*
- * user did not request the recordkey field in the
- * "create external table" statement
- */
- ColumnDescriptor recordkeyColumn = input.getRecordkeyColumn();
- if (recordkeyColumn == null) {
- return 0;
- }
-
- /*
- * The recordkey was filled in the fileAccessor during execution of
- * method readNextObject. The current accessor implementations are
- * SequenceFileAccessor, LineBreakAccessor and AvroFileAccessor from
- * HdfsSplittableDataAccessor and QuotedLineBreakAccessor from
- * HdfsAtomicDataAccessor. For SequenceFileAccessor, LineBreakAccessor
- * the recordkey is set, since it is returned by the
- * SequenceFileRecordReader or LineRecordReader(for text file). But Avro
- * files do not have keys, so the AvroRecordReader will not return a key
- * and in this case recordkey will be null. If the user specified a
- * recordkey attribute in the CREATE EXTERNAL TABLE statement and he
- * reads from an AvroFile, we will throw an exception since the Avro
- * file does not have keys In the future, additional implementations of
- * FileAccessors will have to set recordkey during readNextObject().
- * Otherwise it is null by default and we will throw an exception here,
- * that is if we get here... a careful user will not specify recordkey
- * in the CREATE EXTERNAL statement and then we will leave this function
- * one line above.
- */
- Object recordkey = onerow.getKey();
- if (recordkey == null) {
- throw new NoSuchFieldException("Value for field \"recordkey\" was requested but the queried HDFS resource type does not support key");
- }
-
- OneField oneField = new OneField();
- oneField.type = recordkeyColumn.columnTypeCode();
- oneField.val = extractVal(recordkey);
- recFields.add(oneField);
- return 1;
- }
-
- /*
- * Extracts a java primitive type value from the recordkey. If the key is a
- * Writable implementation we extract the value as a Java primitive. If the
- * key is already a Java primitive we returned it as is If it is an unknown
- * type we throw an exception
- */
- private Object extractVal(Object key) {
- if (extractor == null) {
- extractor = InitializeExtractor(key);
- }
-
- return extractor.get(key);
- }
-
- /*
- * Initialize the extractor object based on the type of the recordkey
- */
- private ValExtractor InitializeExtractor(Object key) {
- if (key instanceof IntWritable) {
- return new ValExtractor() {
- @Override
- public Object get(Object key) {
- return ((IntWritable) key).get();
- }
- };
- } else if (key instanceof ByteWritable) {
- return new ValExtractor() {
- @Override
- public Object get(Object key) {
- return ((ByteWritable) key).get();
- }
- };
- } else if (key instanceof BooleanWritable) {
- return new ValExtractor() {
- @Override
- public Object get(Object key) {
- return ((BooleanWritable) key).get();
- }
- };
- } else if (key instanceof DoubleWritable) {
- return new ValExtractor() {
- @Override
- public Object get(Object key) {
- return ((DoubleWritable) key).get();
- }
- };
- } else if (key instanceof FloatWritable) {
- return new ValExtractor() {
- @Override
- public Object get(Object key) {
- return ((FloatWritable) key).get();
- }
- };
- } else if (key instanceof LongWritable) {
- return new ValExtractor() {
- @Override
- public Object get(Object key) {
- return ((LongWritable) key).get();
- }
- };
- } else if (key instanceof Text) {
- return new ValExtractor() {
- @Override
- public Object get(Object key) {
- return (key).toString();
- }
- };
- } else if (key instanceof VIntWritable) {
- return new ValExtractor() {
- @Override
- public Object get(Object key) {
- return ((VIntWritable) key).get();
- }
- };
- } else {
- return new ValExtractor() {
- @Override
- public Object get(Object key) {
- throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName());
- }
- };
- }
- }
-
- /**
- * Converts given key object to its matching Writable.
- * Supported types: Integer, Byte, Boolean, Double, Float, Long, String.
- * The type is only checked once based on the key, all consequent calls
- * must be of the same type.
- *
- * @param key object to convert
- * @return Writable object matching given key
- */
- public Writable convertKeyValue(Object key) {
- if (converter == null) {
- converter = initializeConverter(key);
- Log.debug("converter initialized for type " + key.getClass() +
- " (key value: " + key + ")");
- }
-
- return converter.get(key);
- }
-
- private ValConverter initializeConverter(Object key) {
-
- if (key instanceof Integer) {
- return new ValConverter() {
- @Override
- public Writable get(Object key) {
- return (new IntWritable((Integer) key));
- }
- };
- } else if (key instanceof Byte) {
- return new ValConverter() {
- @Override
- public Writable get(Object key) {
- return (new ByteWritable((Byte) key));
- }
- };
- } else if (key instanceof Boolean) {
- return new ValConverter() {
- @Override
- public Writable get(Object key) {
- return (new BooleanWritable((Boolean) key));
- }
- };
- } else if (key instanceof Double) {
- return new ValConverter() {
- @Override
- public Writable get(Object key) {
- return (new DoubleWritable((Double) key));
- }
- };
- } else if (key instanceof Float) {
- return new ValConverter() {
- @Override
- public Writable get(Object key) {
- return (new FloatWritable((Float) key));
- }
- };
- } else if (key instanceof Long) {
- return new ValConverter() {
- @Override
- public Writable get(Object key) {
- return (new LongWritable((Long) key));
- }
- };
- } else if (key instanceof String) {
- return new ValConverter() {
- @Override
- public Writable get(Object key) {
- return (new Text((String) key));
- }
- };
- } else {
- return new ValConverter() {
- @Override
- public Writable get(Object key) {
- throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName());
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java
new file mode 100644
index 0000000..fab51ca
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java
@@ -0,0 +1,77 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.*;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema;
+
+/**
+ * A PXF Accessor for reading Avro File records
+ */
+public class AvroFileAccessor extends HdfsSplittableDataAccessor {
+ private AvroWrapper<GenericRecord> avroWrapper = null;
+
+ /**
+ * Constructs a AvroFileAccessor that creates the job configuration and
+ * accesses the avro file to fetch the avro schema
+ *
+ * @param input all input parameters coming from the client
+ * @throws Exception if getting the avro schema fails
+ */
+ public AvroFileAccessor(InputData input) throws Exception {
+ // 1. Call the base class
+ super(input, new AvroInputFormat<GenericRecord>());
+
+ // 2. Accessing the avro file through the "unsplittable" API just to get the schema.
+ // The splittable API (AvroInputFormat) which is the one we will be using to fetch
+ // the records, does not support getting the avro schema yet.
+ Schema schema = getAvroSchema(conf, inputData.getDataSource());
+
+ // 3. Pass the schema to the AvroInputFormat
+ AvroJob.setInputSchema(jobConf, schema);
+
+ // 4. The avroWrapper required for the iteration
+ avroWrapper = new AvroWrapper<GenericRecord>();
+ }
+
+ @Override
+ protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
+ return new AvroRecordReader<Object>(jobConf, (FileSplit) split);
+ }
+
+ /**
+ * readNextObject
+ * The AVRO accessor is currently the only specialized accessor that
+ * overrides this method. This happens, because the special
+ * AvroRecordReader.next() semantics (use of the AvroWrapper), so it
+ * cannot use the RecordReader's default implementation in
+ * SplittableFileAccessor
+ */
+ @Override
+ public OneRow readNextObject() throws IOException {
+ /** Resetting datum to null, to avoid stale bytes to be padded from the previous row's datum */
+ avroWrapper.datum(null);
+ if (reader.next(avroWrapper, NullWritable.get())) { // There is one more record in the current split.
+ return new OneRow(null, avroWrapper.datum());
+ } else if (getNextSplit()) { // The current split is exhausted. try to move to the next split.
+ return reader.next(avroWrapper, NullWritable.get())
+ ? new OneRow(null, avroWrapper.datum())
+ : null;
+ }
+
+ // if neither condition was met, it means we already read all the records in all the splits, and
+ // in this call record variable was not set, so we return null and thus we are signaling end of
+ // records sequence - in this case avroWrapper.datum() will be null
+ return null;
+ }
+}