You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by nh...@apache.org on 2015/10/08 20:37:27 UTC
[4/5] incubator-hawq git commit: HAWQ-28. JavaDoc fixes for PXF
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
index 48fb5b5..13a3546 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
@@ -1,22 +1,20 @@
package com.pivotal.pxf.plugins.hdfs;
-import com.pivotal.pxf.plugins.hdfs.ChunkWritable;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
+import static org.apache.hadoop.mapreduce.lib.input.LineRecordReader.MAX_LINE_LENGTH;
import java.io.IOException;
-import java.io.InputStream;
-import java.lang.IllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -25,241 +23,259 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.Log;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
-import static org.apache.hadoop.mapreduce.lib.input.LineRecordReader.MAX_LINE_LENGTH;
-
/**
- * ChunkRecordReader is designed for fast reading of a file split. The idea is to bring chunks of
- * data instead of single records. The chunks contain many records and the chunk end is not aligned
- * on a record boundary. The size of the chunk is a class hardcoded parameter - CHUNK_SIZE.
- * This behaviour sets this reader apart from the other readers which will fetch one record and
- * stop when reaching a record delimiter.
+ * ChunkRecordReader is designed for fast reading of a file split. The idea is
+ * to bring chunks of data instead of single records. The chunks contain many
+ * records and the chunk end is not aligned on a record boundary. The size of
+ * the chunk is a class hardcoded parameter - CHUNK_SIZE. This behaviour sets
+ * this reader apart from the other readers which will fetch one record and stop
+ * when reaching a record delimiter.
*/
-public class ChunkRecordReader implements RecordReader<LongWritable, ChunkWritable> {
- private static final Log LOG
- = LogFactory.getLog(ChunkRecordReader.class.getName());
+public class ChunkRecordReader implements
+ RecordReader<LongWritable, ChunkWritable> {
+ private static final Log LOG = LogFactory.getLog(ChunkRecordReader.class.getName());
+
+ private CompressionCodecFactory compressionCodecs = null;
+ private long start;
+ private long pos;
+ private long end;
+ private long fileLength;
+ private ChunkReader in;
+ private FSDataInputStream fileIn;
+ private final Seekable filePosition;
+ private int maxLineLength;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
+ private static final int CHUNK_SIZE = 1024 * 1024;
+
+ /**
+ * Translates the FSDataInputStream into a DFSInputStream.
+ */
+ private DFSInputStream getInputStream() {
+ return (DFSInputStream) (fileIn.getWrappedStream());
+ }
+
+ /**
+ * Returns statistics of the input stream's read operation: total bytes
+ * read, bytes read locally, bytes read in short-circuit (directly from file
+ * descriptor).
+ *
+ * @return an instance of ReadStatistics class
+ */
+ public ReadStatistics getReadStatistics() {
+ return getInputStream().getReadStatistics();
+ }
+
+ /**
+ * Constructs a ChunkRecordReader instance.
+ *
+ * @param job the job configuration
+ * @param split contains the file name, begin byte of the split and the
+ * bytes length
+ * @throws IOException if an I/O error occurs when accessing the file or
+ * creating input stream to read from it
+ */
+ public ChunkRecordReader(Configuration job, FileSplit split)
+ throws IOException {
+ maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
+ validateLength(maxLineLength);
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+ compressionCodecs = new CompressionCodecFactory(job);
+ codec = compressionCodecs.getCodec(file);
+
+ // open the file and seek to the start of the split
+ job.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+ final FileSystem fs = file.getFileSystem(job);
+ fs.setVerifyChecksum(false);
+ fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
+ fileLength = getInputStream().getFileLength();
+ if (isCompressedInput()) {
+ decompressor = CodecPool.getDecompressor(codec);
+ if (codec instanceof SplittableCompressionCodec) {
+ final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+ fileIn, decompressor, start, end,
+ SplittableCompressionCodec.READ_MODE.BYBLOCK);
+ in = new ChunkReader(cIn);
+ start = cIn.getAdjustedStart();
+ end = cIn.getAdjustedEnd();
+ filePosition = cIn; // take pos from compressed stream
+ } else {
+ in = new ChunkReader(codec.createInputStream(fileIn,
+ decompressor));
+ filePosition = fileIn;
+ }
+ } else {
+ fileIn.seek(start);
+ in = new ChunkReader(fileIn);
+ filePosition = fileIn;
+ }
+ /*
+ * If this is not the first split, we always throw away first record
+ * because we always (except the last split) read one extra line in
+ * next() method.
+ */
+ if (start != 0) {
+ start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
+ }
+ this.pos = start;
+ }
+
+ /**
+ * Used by the client of this class to create the 'key' output parameter for
+ * next() method.
+ *
+ * @return an instance of LongWritable
+ */
+ @Override
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ /**
+ * Used by the client of this class to create the 'value' output parameter
+ * for next() method.
+ *
+ * @return an instance of ChunkWritable
+ */
+ @Override
+ public ChunkWritable createValue() {
+ return new ChunkWritable();
+ }
+
+ /**
+ * Fetches the next data chunk from the file split. The size of the chunk is
+ * a class hardcoded parameter - CHUNK_SIZE. This behaviour sets this reader
+ * apart from the other readers which will fetch one record and stop when
+ * reaching a record delimiter.
+ *
+ * @param key - output parameter. When method returns will contain the key -
+ * the number of the start byte of the chunk
+ * @param value - output parameter. When method returns will contain the
+ * value - the chunk, a byte array inside the ChunkWritable
+ * instance
+ * @return false - when end of split was reached
+ * @throws IOException if an I/O error occurred while reading the next chunk
+ * or line
+ */
+ @Override
+ public synchronized boolean next(LongWritable key, ChunkWritable value)
+ throws IOException {
+ /*
+ * Usually a record is spread between the end of current split and the
+ * beginning of next split. So when reading the last record in the split
+ * we usually need to cross over to the next split. This tricky logic is
+ * implemented in ChunkReader.readLine(). In order not to rewrite this
+ * logic we will read the lust chunk in the split with readLine(). For a
+ * split of 120M, reading the last 1M line by line doesn't have a huge
+ * impact. Applying a factor to the last chunk to make sure we start
+ * before the last record.
+ */
+ float factor = 1.5f;
+ int limit = (int) (factor * CHUNK_SIZE);
+ long curPos = getFilePosition();
+ int newSize = 0;
+
+ while (curPos <= end) {
+ key.set(pos);
+
+ if ((end - curPos) > limit) {
+ newSize = in.readChunk(value, CHUNK_SIZE);
+ } else {
+ newSize = in.readLine(value,
+ Math.max(maxBytesToConsume(pos), maxLineLength));
+ }
+ if (newSize == 0) {
+ break;
+ }
+
+ pos += newSize;
+
+ if (pos == fileLength) { /*
+ * in case text file last character is not
+ * a linefeed
+ */
+ if (value.box[value.box.length - 1] != '\n') {
+ int newLen = value.box.length + 1;
+ byte[] tmp = new byte[newLen];
+ System.arraycopy(value.box, 0, tmp, 0, newLen - 1);
+ tmp[newLen - 1] = '\n';
+ value.box = tmp;
+ }
+ }
+
+ return true;
+ }
+ /*
+ * if we got here, either newSize was 0 or curPos is bigger than end
+ */
- private CompressionCodecFactory compressionCodecs = null;
- private long start;
- private long pos;
- private long end;
- private long fileLength;
- private ChunkReader in;
- private FSDataInputStream fileIn;
- private final Seekable filePosition;
- private int maxLineLength;
- private CompressionCodec codec;
- private Decompressor decompressor;
- private static final int CHUNK_SIZE = 1024 * 1024;
+ return false;
+ }
- /*
- * Translate the FSDataInputStream into a DFSInputStream
- */
- private DFSInputStream getInputStream() {
- return (DFSInputStream)(fileIn.getWrappedStream());
- }
-
- /**
- * Returns statistics of the input stream's read operation: total bytes read,
- * bytes read localy, bytes read in short-circuit (directly from file descriptor)
- * @return an instance of ReadStatistics class
- */
- public ReadStatistics getReadStatistics() {
- return getInputStream().getReadStatistics();
- }
+ /**
+ * Gets the progress within the split.
+ */
+ @Override
+ public synchronized float getProgress() throws IOException {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (getFilePosition() - start)
+ / (float) (end - start));
+ }
+ }
- /**
- * Constructs a ChunkRecordReader instance
- * @param job the job configuration
- * @param split contains the file name, begin byte of the split and the bytes length
- */
- public ChunkRecordReader(Configuration job, FileSplit split) throws IOException {
- maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
- validateLength(maxLineLength);
- start = split.getStart();
- end = start + split.getLength();
- final Path file = split.getPath();
- compressionCodecs = new CompressionCodecFactory(job);
- codec = compressionCodecs.getCodec(file);
+ /**
+ * Returns the position of the unread tail of the file
+ *
+ * @return pos - start byte of the unread tail of the file
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
- // open the file and seek to the start of the split
- job.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
- final FileSystem fs = file.getFileSystem(job);
- fs.setVerifyChecksum(false);
- fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
- fileLength = getInputStream().getFileLength();
- if (isCompressedInput()) {
- decompressor = CodecPool.getDecompressor(codec);
- if (codec instanceof SplittableCompressionCodec) {
- final SplitCompressionInputStream cIn =
- ((SplittableCompressionCodec)codec).createInputStream(
- fileIn, decompressor, start, end,
- SplittableCompressionCodec.READ_MODE.BYBLOCK);
- in = new ChunkReader(cIn);
- start = cIn.getAdjustedStart();
- end = cIn.getAdjustedEnd();
- filePosition = cIn; // take pos from compressed stream
- } else {
- in = new ChunkReader(codec.createInputStream(fileIn, decompressor));
- filePosition = fileIn;
- }
- } else {
- fileIn.seek(start);
- in = new ChunkReader(fileIn);
- filePosition = fileIn;
- }
- /*
- * If this is not the first split, we always throw away first record
- * because we always (except the last split) read one extra line in
- * next() method.
- */
- if (start != 0) {
- start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
- }
- this.pos = start;
- }
-
- /**
- * Used by the client of this class to create the 'key' output parameter for next() method
- * @return an instance of LongWritable
- */
- @Override
- public LongWritable createKey() {
- return new LongWritable();
- }
-
- /**
- * Used by the client of this class to create the 'value' output parameter for next() method
- * @return an instance of ChunkWritable
- */
- @Override
- public ChunkWritable createValue() {
- return new ChunkWritable();
- }
-
- /**
- * Fetch the next data chunk from the file split. The size of the chunk is a class hardcoded
- * parameter - CHUNK_SIZE. This behaviour sets this reader apart from the other readers which
- * will fetch one record and stop when reaching a record delimiter.
- * @param key - output parameter. When method returns will contain the key - the number
- * of the start byte of the chunk
- * @param value - output parameter. When method returns will contain the value - the chunk,
- * a byte array inside the ChunkWritable instance
- * @return false - when end of split was reached
- */
- @Override
- public synchronized boolean next(LongWritable key, ChunkWritable value)
- throws IOException {
- /*
- * Usualy a record is spread between the end of current split and the beginning
- * of next split. So when reading the last record in the split we usually need to
- * cross over to the next split. This tricky logic is implemented in
- * ChunkReader.readLine().
- * In order not to rewrite this logic we will read the lust chunk in the split
- * with readLine(). For a split of 120M, reading the last 1M line by line doesn't
- * have a huge impact. Applying a factor to the last chunk to make sure we start
- * before the last record.
- */
- float factor = 1.5f;
- int limit = (int)(factor*CHUNK_SIZE);
- long curPos = getFilePosition();
- int newSize = 0;
-
- while (curPos <= end) {
- key.set(pos);
-
- if ((end - curPos) > limit) {
- newSize = in.readChunk(value, CHUNK_SIZE);
- }
- else {
- newSize = in.readLine(value, Math.max(maxBytesToConsume(pos), maxLineLength));
- }
- if (newSize == 0) {
- break;
- }
-
- pos += newSize;
-
- if (pos == fileLength) { /* in case text file last character is not a linefeed*/
- if (value.box[value.box.length - 1] != (int)'\n') {
- int newLen = value.box.length + 1;
- byte [] tmp = new byte [newLen];
- System.arraycopy(value.box, 0, tmp, 0, newLen - 1);
- tmp[newLen - 1] = '\n';
- value.box = tmp;
- }
- }
-
- return true;
- }
- /*
- * if we got here, either newSize was 0 or curPos is bigger than end
- */
-
- return false;
- }
-
- /**
- * Get the progress within the split
- */
- @Override
- public synchronized float getProgress() throws IOException {
- if (start == end) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
- }
- }
-
- /**
- * @return pos - start byte of the unread tail of the file
- */
- public synchronized long getPos() throws IOException {
- return pos;
- }
-
- /**
- * Close the input stream
- */
- @Override
- public synchronized void close() throws IOException {
- try {
- if (in != null) {
- in.close();
- }
- } finally {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- }
- }
- }
+ /**
+ * Closes the input stream.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
+ }
+ }
- private void validateLength(int maxLineLength) {
- if (maxLineLength <= 0)
- throw new IllegalArgumentException("maxLineLength must be a positive value");
- }
+ private void validateLength(int maxLineLength) {
+ if (maxLineLength <= 0)
+ throw new IllegalArgumentException(
+ "maxLineLength must be a positive value");
+ }
- private boolean isCompressedInput() {
- return (codec != null);
- }
+ private boolean isCompressedInput() {
+ return (codec != null);
+ }
- private int maxBytesToConsume(long pos) {
- return isCompressedInput()
- ? Integer.MAX_VALUE
- : (int) Math.min(Integer.MAX_VALUE, end - pos);
- }
+ private int maxBytesToConsume(long pos) {
+ return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(
+ Integer.MAX_VALUE, end - pos);
+ }
- private long getFilePosition() throws IOException {
- long retVal;
- if (isCompressedInput() && null != filePosition) {
- retVal = filePosition.getPos();
- } else {
- retVal = pos;
- }
- return retVal;
- }
+ private long getFilePosition() throws IOException {
+ long retVal;
+ if (isCompressedInput() && null != filePosition) {
+ retVal = filePosition.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
} // class ChunkRecordReader
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
index b19b3fd..a0a8b17 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
@@ -2,7 +2,6 @@ package com.pivotal.pxf.plugins.hdfs;
import java.io.DataOutput;
import java.io.DataInput;
-import java.io.IOException;
import java.lang.UnsupportedOperationException;
import org.apache.hadoop.io.Writable;
@@ -13,29 +12,28 @@ import org.apache.hadoop.io.Writable;
*/
public class ChunkWritable implements Writable {
public byte [] box;
-
+
/**
- * Serialize the fields of this object to <code>out</code>.
+ * Serializes the fields of this object to <code>out</code>.
*
* @param out <code>DataOutput</code> to serialize this object into.
- * @throws IOException
+ * @throws UnsupportedOperationException this function is not supported
*/
@Override
public void write(DataOutput out) {
throw new UnsupportedOperationException("ChunkWritable.write() is not implemented");
}
-
+
/**
- * Deserialize the fields of this object from <code>in</code>.
+ * Deserializes the fields of this object from <code>in</code>.
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deserialize this object from.
- * @throws IOException
+ * @throws UnsupportedOperationException this function is not supported
*/
@Override
public void readFields(DataInput in) {
throw new UnsupportedOperationException("ChunkWritable.readFields() is not implemented");
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
index f18ecef..0f8f908 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
@@ -20,7 +20,6 @@ import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
import java.util.ArrayList;
-
/**
* Analyzer class for HDFS data resources
*
@@ -33,10 +32,10 @@ public class HdfsAnalyzer extends Analyzer {
private Log Log;
/**
- * Constructs an HdfsAnalyzer object
- *
+ * Constructs an HdfsAnalyzer object.
+ *
* @param inputData all input parameters coming from the client
- * @throws IOException
+ * @throws IOException if HDFS file system cannot be retrieved
*/
public HdfsAnalyzer(InputData inputData) throws IOException {
super(inputData);
@@ -49,11 +48,13 @@ public class HdfsAnalyzer extends Analyzer {
/**
* Collects a number of basic statistics based on an estimate. Statistics
* are: number of records, number of hdfs blocks and hdfs block size.
- *
- * @param datapath path is a data source URI that can appear as a file
+ *
+ * @param datapath path is a data source URI that can appear as a file
* name, a directory name or a wildcard pattern
- * @return statistics in json format
- * @throws Exception
+ * @return statistics in JSON format
+ * @throws Exception if path is wrong, its metadata cannot be retrieved
+ * from file system, or if scanning the first block
+ * using the accessor failed
*/
@Override
public AnalyzerStats getEstimatedStats(String datapath) throws Exception {
@@ -89,8 +90,8 @@ public class HdfsAnalyzer extends Analyzer {
return stats;
}
- /*
- * Calculate the number of tuples in a split (block)
+ /**
+ * Calculates the number of tuples in a split (block).
* Reads one block from HDFS. Exception during reading will
* filter upwards and handled in AnalyzerResource
*/
@@ -129,7 +130,7 @@ public class HdfsAnalyzer extends Analyzer {
PxfInputFormat.setInputPaths(jobConf, path);
InputSplit[] splits = fformat.getSplits(jobConf, 1);
ArrayList<InputSplit> result = new ArrayList<InputSplit>();
-
+
// remove empty splits
if (splits != null) {
for (InputSplit split : splits) {
@@ -138,7 +139,7 @@ public class HdfsAnalyzer extends Analyzer {
}
}
}
-
- return result;
+
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
index da02590..4a43c5f 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
@@ -15,19 +15,18 @@ import java.io.InputStream;
import java.net.URI;
/**
- * Base class for enforcing the complete access of a file in one accessor.
+ * Base class for enforcing the complete access of a file in one accessor.
* Since we are not accessing the file using the splittable API, but instead are
* using the "simple" stream API, it means that we cannot fetch different parts
* (splits) of the file in different segments. Instead each file access brings
- * the complete file. And, if several segments would access the same file, then
+ * the complete file. And, if several segments would access the same file, then
* each one will return the whole file and we will observe in the query result,
- * each record appearing number_of_segments times. To avoid this we will only
- * have one segment (segment 0) working for this case - enforced with
- * isWorkingSegment() method. Naturally this is the less recommended working
+ * each record appearing number_of_segments times. To avoid this we will only
+ * have one segment (segment 0) working for this case - enforced with
+ * isWorkingSegment() method. Naturally this is the less recommended working
* mode since we are not making use of segment parallelism. HDFS accessors for
* a specific file type should inherit from this class only if the file they are
* reading does not support splitting: a protocol-buffer file, regular file, ...
- *
*/
public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAccessor {
private Configuration conf = null;
@@ -36,10 +35,10 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
/**
* Constructs a HdfsAtomicDataAccessor object.
+ *
* @param input all input parameters coming from the client
- * @throws Exception
*/
- public HdfsAtomicDataAccessor(InputData input) throws Exception {
+ public HdfsAtomicDataAccessor(InputData input) {
// 0. Hold the configuration data
super(input);
@@ -52,10 +51,11 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
/**
* Opens the file using the non-splittable API for HADOOP HDFS file access
* This means that instead of using a FileInputFormat for access, we use a
- * Java stream
- *
+ * Java stream.
+ *
* @return true for successful file open, false otherwise
*/
+ @Override
public boolean openForRead() throws Exception {
if (!isWorkingSegment()) {
return false;
@@ -70,8 +70,10 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
/**
* Fetches one record from the file.
- * @return a {@link OneRow} record as a Java object. returns null if none.
+ *
+ * @return a {@link OneRow} record as a Java object. Returns null if none.
*/
+ @Override
public OneRow readNextObject() throws IOException {
if (!isWorkingSegment()) {
return null;
@@ -83,6 +85,7 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
/**
* Closes the access stream when finished reading the file
*/
+ @Override
public void closeForRead() throws Exception {
if (!isWorkingSegment()) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
index ab4aa67..1bf5aab 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
@@ -15,43 +15,43 @@ import java.io.IOException;
import java.util.List;
/**
- * Fragmenter class for HDFS data resources
+ * Fragmenter class for HDFS data resources.
*
- * Given an HDFS data source (a file, directory, or wild card pattern)
- * divide the data into fragments and return a list of them along with
- * a list of host:port locations for each.
+ * Given an HDFS data source (a file, directory, or wild card pattern) divide
+ * the data into fragments and return a list of them along with a list of
+ * host:port locations for each.
*/
public class HdfsDataFragmenter extends Fragmenter {
private JobConf jobConf;
/**
- * Constructs an HdfsDataFragmenter object
+ * Constructs an HdfsDataFragmenter object.
+ *
* @param md all input parameters coming from the client
- * @throws IOException
*/
- public HdfsDataFragmenter(InputData md) throws IOException {
+ public HdfsDataFragmenter(InputData md) {
super(md);
jobConf = new JobConf(new Configuration(), HdfsDataFragmenter.class);
}
- /*
- * path is a data source URI that can appear as a file
- * name, a directory name or a wildcard returns the data
- * fragments in json format
+ /**
+ * Gets the fragments for a data source URI that can appear as a file name,
+ * a directory name or a wildcard. Returns the data fragments in JSON
+ * format.
*/
@Override
public List<Fragment> getFragments() throws Exception {
- String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+ String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
InputSplit[] splits = getSplits(new Path(absoluteDataPath));
- for (InputSplit split : splits != null ? splits : new InputSplit[]{}) {
+ for (InputSplit split : splits != null ? splits : new InputSplit[] {}) {
FileSplit fsp = (FileSplit) split;
- /*
- * HD-2547: If the file is empty, an empty split is returned:
- * no locations and no length.
- */
+ /*
+ * HD-2547: If the file is empty, an empty split is returned: no
+ * locations and no length.
+ */
if (fsp.getLength() <= 0) {
continue;
}
@@ -59,10 +59,10 @@ public class HdfsDataFragmenter extends Fragmenter {
String filepath = fsp.getPath().toUri().getPath();
String[] hosts = fsp.getLocations();
- /*
- * metadata information includes: file split's
- * start, length and hosts (locations).
- */
+ /*
+ * metadata information includes: file split's start, length and
+ * hosts (locations).
+ */
byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp);
Fragment fragment = new Fragment(filepath, hosts, fragmentMetadata);
fragments.add(fragment);
@@ -76,5 +76,4 @@ public class HdfsDataFragmenter extends Fragmenter {
PxfInputFormat.setInputPaths(jobConf, path);
return format.getSplits(jobConf, 1);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
index 8f49d48..744342d 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
@@ -16,10 +16,11 @@ import java.util.ListIterator;
* Accessor for accessing a splittable HDFS data sources. HDFS will divide the
* file into splits based on an internal decision (by default, the block size is
* also the split size).
- *
+ *
* Accessors that require such base functionality should extend this class.
*/
-public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadAccessor {
+public abstract class HdfsSplittableDataAccessor extends Plugin implements
+ ReadAccessor {
protected Configuration conf = null;
protected RecordReader<Object, Object> reader = null;
protected InputFormat<?, ?> inputFormat = null;
@@ -29,12 +30,12 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
/**
* Constructs an HdfsSplittableDataAccessor
- *
+ *
* @param input all input parameters coming from the client request
* @param inFormat the HDFS {@link InputFormat} the caller wants to use
- * @throws Exception
*/
- public HdfsSplittableDataAccessor(InputData input, InputFormat<?, ?> inFormat) throws Exception {
+ public HdfsSplittableDataAccessor(InputData input,
+ InputFormat<?, ?> inFormat) {
super(input);
inputFormat = inFormat;
@@ -46,11 +47,12 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
}
/**
- * Fetches the requested fragment (file split) for the current client
+ * Fetches the requested fragment (file split) for the current client
* request, and sets a record reader for the job.
- *
+ *
* @return true if succeeded, false if no more splits to be read
*/
+ @Override
public boolean openForRead() throws Exception {
LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
@@ -62,23 +64,28 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
}
/**
- * Specialized accessors will override this method and implement their own
+ * Specialized accessors will override this method and implement their own
* recordReader. For example, a plain delimited text accessor may want to
- * return a LineRecordReader.
- *
+ * return a LineRecordReader.
+ *
* @param jobConf the hadoop jobconf to use for the selected InputFormat
* @param split the input split to be read by the accessor
- * @return a recordreader to be used for reading the data records of the split
- * @throws IOException
+ * @return a recordreader to be used for reading the data records of the
+ * split
+ * @throws IOException if recordreader could not be created
*/
- abstract protected Object getReader(JobConf jobConf, InputSplit split) throws IOException;
+ abstract protected Object getReader(JobConf jobConf, InputSplit split)
+ throws IOException;
- /*
- * getNextSplit
- * Sets the current split and initializes a RecordReader who feeds from the split
+ /**
+ * Sets the current split and initializes a RecordReader who feeds from the
+ * split
+ *
+ * @return true if there is a split to read
+ * @throws IOException if record reader could not be created
*/
@SuppressWarnings(value = "unchecked")
- protected boolean getNextSplit() throws IOException {
+ protected boolean getNextSplit() throws IOException {
if (!iter.hasNext()) {
return false;
}
@@ -90,36 +97,40 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
return true;
}
- /*
- * readNextObject
- * Fetches one record from the file. The record is returned as a Java object.
+ /**
+ * Fetches one record from the file. The record is returned as a Java
+ * object.
*/
@Override
public OneRow readNextObject() throws IOException {
-
- if (!reader.next(key, data)) { // if there is one more record in the current split
- if (getNextSplit()) {// the current split is exhausted. try to move to the next split.
- if (!reader.next(key, data)) {// read the first record of the new split
- return null; // make sure we return nulls
+ // if there is one more record in the current split
+ if (!reader.next(key, data)) {
+ // the current split is exhausted. try to move to the next split
+ if (getNextSplit()) {
+ // read the first record of the new split
+ if (!reader.next(key, data)) {
+ // make sure we return nulls
+ return null;
}
} else {
- return null; // make sure we return nulls
+ // make sure we return nulls
+ return null;
}
}
- /*
- * if neither condition was met, it means we already read all the
- * records in all the splits, and in this call record variable was not
- * set, so we return null and thus we are signaling end of records
- * sequence
- */
+ /*
+ * if neither condition was met, it means we already read all the
+ * records in all the splits, and in this call record variable was not
+ * set, so we return null and thus we are signaling end of records
+ * sequence
+ */
return new OneRow(key, data);
}
- /*
- * closeForRead
+ /**
* When user finished reading the file, it closes the RecordReader
*/
+ @Override
public void closeForRead() throws Exception {
if (reader != null) {
reader.close();
@@ -129,7 +140,7 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
@Override
public boolean isThreadSafe() {
return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
- inputData.getUserProperty("COMPRESSION_CODEC"));
+ inputData.getUserProperty("COMPRESSION_CODEC"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
index 28b125c..32002a1 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
@@ -16,39 +16,38 @@ import org.apache.hadoop.mapred.*;
import java.io.DataOutputStream;
import java.io.IOException;
-
/**
- * A PXF Accessor for reading delimited plain text records
+ * A PXF Accessor for reading delimited plain text records.
*/
-public class LineBreakAccessor extends HdfsSplittableDataAccessor implements WriteAccessor {
+public class LineBreakAccessor extends HdfsSplittableDataAccessor implements
+ WriteAccessor {
private DataOutputStream dos;
private FSDataOutputStream fsdos;
private Configuration conf;
private FileSystem fs;
private Path file;
- private Log Log;
+ private static Log Log = LogFactory.getLog(LineBreakAccessor.class);
/**
- * Constructs a LineReaderAccessor
- *
+ * Constructs a LineReaderAccessor.
+ *
* @param input all input parameters coming from the client request
- * @throws Exception
*/
- public LineBreakAccessor(InputData input) throws Exception {
+ public LineBreakAccessor(InputData input) {
super(input, new TextInputFormat());
((TextInputFormat) inputFormat).configure(jobConf);
-
- Log = LogFactory.getLog(LineBreakAccessor.class);
}
@Override
- protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
+ protected Object getReader(JobConf jobConf, InputSplit split)
+ throws IOException {
return new ChunkRecordReader(jobConf, (FileSplit) split);
}
-
- /*
- * opens file for write
+
+ /**
+ * Opens file for write.
*/
+ @Override
public boolean openForWrite() throws Exception {
String fileName = inputData.getDataSource();
@@ -68,7 +67,8 @@ public class LineBreakAccessor extends HdfsSplittableDataAccessor implements Wri
file = new Path(fileName);
if (fs.exists(file)) {
- throw new IOException("file " + file.toString() + " already exists, can't write data");
+ throw new IOException("file " + file.toString()
+ + " already exists, can't write data");
}
org.apache.hadoop.fs.Path parent = file.getParent();
if (!fs.exists(parent)) {
@@ -83,10 +83,11 @@ public class LineBreakAccessor extends HdfsSplittableDataAccessor implements Wri
}
/*
- * Create output stream from given file.
- * If compression codec is provided, wrap it around stream.
+ * Creates output stream from given file. If compression codec is provided,
+ * wrap it around stream.
*/
- private void createOutputStream(Path file, CompressionCodec codec) throws IOException {
+ private void createOutputStream(Path file, CompressionCodec codec)
+ throws IOException {
fsdos = fs.create(file, false);
if (codec != null) {
dos = new DataOutputStream(codec.createOutputStream(fsdos));
@@ -96,28 +97,30 @@ public class LineBreakAccessor extends HdfsSplittableDataAccessor implements Wri
}
- /*
- * write row into stream
+ /**
+ * Writes row into stream.
*/
+ @Override
public boolean writeNextObject(OneRow onerow) throws Exception {
dos.write((byte[]) onerow.getData());
return true;
}
-
- /*
- * close the output stream after done writing
+
+ /**
+ * Closes the output stream after done writing.
*/
+ @Override
public void closeForWrite() throws Exception {
if ((dos != null) && (fsdos != null)) {
Log.debug("Closing writing stream for path " + file);
dos.flush();
/*
* From release 0.21.0 sync() is deprecated in favor of hflush(),
- * which only guarantees that new readers will see all data written
- * to that point, and hsync(), which makes a stronger guarantee that
- * the operating system has flushed the data to disk (like POSIX
- * fsync), although data may still be in the disk cache.
- */
+ * which only guarantees that new readers will see all data written
+ * to that point, and hsync(), which makes a stronger guarantee that
+ * the operating system has flushed the data to disk (like POSIX
+ * fsync), although data may still be in the disk cache.
+ */
fsdos.hsync();
dos.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
index 3fc0f5e..13880b8 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
@@ -16,12 +16,11 @@ public class QuotedLineBreakAccessor extends HdfsAtomicDataAccessor {
private BufferedReader reader;
/**
- * Constructs a QuotedLineBreakAccessor
- *
+ * Constructs a QuotedLineBreakAccessor.
+ *
* @param input all input parameters coming from the client request
- * @throws Exception
*/
- public QuotedLineBreakAccessor(InputData input) throws Exception {
+ public QuotedLineBreakAccessor(InputData input) {
super(input);
}
@@ -34,8 +33,7 @@ public class QuotedLineBreakAccessor extends HdfsAtomicDataAccessor {
return true;
}
- /*
- * readNextObject
+ /**
* Fetches one record (maybe partial) from the file. The record is returned as a Java object.
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
index d3e7292..5f3f3dd 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
@@ -4,6 +4,7 @@ import com.pivotal.pxf.api.OneRow;
import com.pivotal.pxf.api.WriteAccessor;
import com.pivotal.pxf.api.utilities.InputData;
import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -24,7 +25,8 @@ import java.util.EnumSet;
/**
* A PXF Accessor for reading and writing Sequence File records
*/
-public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements WriteAccessor {
+public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
+ WriteAccessor {
private Configuration conf;
private FileContext fc;
@@ -34,27 +36,24 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
private SequenceFile.Writer writer;
private LongWritable defaultKey; // used when recordkey is not defined
- private Log Log;
+ private static Log Log = LogFactory.getLog(SequenceFileAccessor.class);;
/**
- * Constructs a SequenceFileAccessor
- *
+ * Constructs a SequenceFileAccessor.
+ *
* @param input all input parameters coming from the client request
- * @throws Exception
*/
- public SequenceFileAccessor(InputData input) throws Exception {
-
- super(input,
- new SequenceFileInputFormat<Writable, Writable>());
-
- Log = LogFactory.getLog(SequenceFileAccessor.class);
+ public SequenceFileAccessor(InputData input) {
+ super(input, new SequenceFileInputFormat<Writable, Writable>());
}
- /*
- * Override virtual method to create specialized record reader
+ /**
+ * Overrides virtual method to create specialized record reader
*/
- protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
- return new SequenceFileRecordReader(jobConf, (FileSplit) split);
+ @Override
+ protected Object getReader(JobConf jobConf, InputSplit split)
+ throws IOException {
+ return new SequenceFileRecordReader<Object, Object>(jobConf, (FileSplit) split);
}
@Override
@@ -74,7 +73,8 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
defaultKey = new LongWritable(inputData.getSegmentId());
if (fs.exists(file)) {
- throw new IOException("file " + file + " already exists, can't write data");
+ throw new IOException("file " + file
+ + " already exists, can't write data");
}
parent = file.getParent();
if (!fs.exists(parent)) {
@@ -87,10 +87,11 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
}
/**
- * Compression: based on compression codec and compression type (default value RECORD).
- * If there is no codec, compression type is ignored, and NONE is used.
+ * Compression: based on compression codec and compression type (default
+ * value RECORD). If there is no codec, compression type is ignored, and
+ * NONE is used.
*
- * @param inputData - container where compression codec and type are held.
+ * @param inputData - container where compression codec and type are held
*/
private void getCompressionCodec(InputData inputData) {
@@ -106,21 +107,23 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
try {
compressionType = CompressionType.valueOf(parsedCompressType);
} catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Illegal value for compression type " +
- "'" + parsedCompressType + "'");
+ throw new IllegalArgumentException(
+ "Illegal value for compression type " + "'"
+ + parsedCompressType + "'");
}
if (compressionType == null) {
- throw new IllegalArgumentException("Compression type must be defined");
+ throw new IllegalArgumentException(
+ "Compression type must be defined");
}
- Log.debug("Compression ON: " +
- "compression codec: " + userCompressCodec +
- ", compression type: " + compressionType);
+ Log.debug("Compression ON: " + "compression codec: "
+ + userCompressCodec + ", compression type: "
+ + compressionType);
}
}
/*
- * Parse compression type for sequence file. If null, default to RECORD.
+ * Parses compression type for sequence file. If null, default to RECORD.
* Allowed values: RECORD, BLOCK.
*/
private String parseCompressionType(String compressType) {
@@ -133,13 +136,15 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
}
if (compressType.equalsIgnoreCase(COMPRESSION_TYPE_NONE)) {
- throw new IllegalArgumentException("Illegal compression type 'NONE'. " +
- "For disabling compression remove COMPRESSION_CODEC parameter.");
+ throw new IllegalArgumentException(
+ "Illegal compression type 'NONE'. "
+ + "For disabling compression remove COMPRESSION_CODEC parameter.");
}
- if (!compressType.equalsIgnoreCase(COMPRESSION_TYPE_RECORD) &&
- !compressType.equalsIgnoreCase(COMPRESSION_TYPE_BLOCK)) {
- throw new IllegalArgumentException("Illegal compression type '" + compressType + "'");
+ if (!compressType.equalsIgnoreCase(COMPRESSION_TYPE_RECORD)
+ && !compressType.equalsIgnoreCase(COMPRESSION_TYPE_BLOCK)) {
+ throw new IllegalArgumentException("Illegal compression type '"
+ + compressType + "'");
}
return compressType.toUpperCase();
@@ -165,11 +170,13 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
// init writer on first approach here, based on onerow.getData type
// TODO: verify data is serializable.
if (writer == null) {
- Class valueClass = value.getClass();
- Class keyClass = (key == null) ? LongWritable.class : key.getClass();
+ Class<? extends Writable> valueClass = value.getClass();
+ Class<? extends Writable> keyClass = (key == null) ? LongWritable.class
+ : key.getClass();
// create writer - do not allow overwriting existing file
- writer = SequenceFile.createWriter(fc, conf, file, keyClass, valueClass,
- compressionType, codec, new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE));
+ writer = SequenceFile.createWriter(fc, conf, file, keyClass,
+ valueClass, compressionType, codec,
+ new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE));
}
try {
@@ -188,20 +195,21 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
writer.sync();
/*
* From release 0.21.0 sync() is deprecated in favor of hflush(),
- * which only guarantees that new readers will see all data written to that point,
- * and hsync(), which makes a stronger guarantee that the operating system has flushed
- * the data to disk (like POSIX fsync), although data may still be in the disk cache.
- */
+ * which only guarantees that new readers will see all data written
+ * to that point, and hsync(), which makes a stronger guarantee that
+ * the operating system has flushed the data to disk (like POSIX
+ * fsync), although data may still be in the disk cache.
+ */
writer.hsync();
writer.close();
}
}
- public CompressionType getCompressionType() {
- return compressionType;
- }
+ public CompressionType getCompressionType() {
+ return compressionType;
+ }
- public CompressionCodec getCodec() {
- return codec;
- }
+ public CompressionCodec getCodec() {
+ return codec;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
index 588792b..7ace4c8 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
@@ -15,8 +15,8 @@ import java.util.List;
import static com.pivotal.pxf.api.io.DataType.VARCHAR;
/**
- * StringPassResolver handles "deserialization" and serialization of
- * String records. StringPassResolver implements IReadResolver and
+ * StringPassResolver handles "deserialization" and serialization of
+ * String records. StringPassResolver implements IReadResolver and
* IWriteResolver interfaces. Returns strings as-is.
*/
public class StringPassResolver extends Plugin implements ReadResolver, WriteResolver {
@@ -24,20 +24,19 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
private OneRow oneRow;
/**
- * Constructs a StringPassResolver
- *
+ * Constructs a StringPassResolver.
+ *
* @param inputData input all input parameters coming from the client request
- * @throws Exception
*/
- public StringPassResolver(InputData inputData) throws Exception {
+ public StringPassResolver(InputData inputData) {
super(inputData);
oneRow = new OneRow();
this.inputData = inputData;
}
- /*
- * getFields returns a list of the fields of one record.
- * Each record field is represented by a OneField item.
+ /**
+ * Returns a list of the fields of one record.
+ * Each record field is represented by a {@link OneField} item.
* OneField item contains two fields: an integer representing the field type and a Java
* Object representing the field value.
*/
@@ -45,7 +44,7 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
public List<OneField> getFields(OneRow onerow) {
/*
* This call forces a whole text line into a single varchar field and replaces
- * the proper field separation code can be found in previous revisions. The reasons
+ * the proper field separation code can be found in previous revisions. The reasons
* for doing so as this point are:
* 1. performance
* 2. desire to not replicate text parsing logic from the backend into java
@@ -61,8 +60,8 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
return record;
}
- /*
- * Creates a OneRow object from the singleton list
+ /**
+ * Creates a OneRow object from the singleton list.
*/
@Override
public OneRow setFields(List<OneField> record) throws Exception {
@@ -74,5 +73,3 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
return oneRow;
}
}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
index 2913433..e9df907 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
@@ -23,11 +23,11 @@ import java.util.List;
import static com.pivotal.pxf.api.io.DataType.*;
/**
- * WritableResolver handles serialization and deserialization of records
+ * WritableResolver handles serialization and deserialization of records
* that were serialized using Hadoop's Writable serialization framework.
- *
- * A field named 'recordkey' is treated as a key of the given row, and not as
- * part of the data schema. @See RecordkeyAdapter.
+ *
+ * A field named 'recordkey' is treated as a key of the given row, and not as
+ * part of the data schema. See {@link RecordkeyAdapter}.
*/
public class WritableResolver extends Plugin implements ReadResolver, WriteResolver {
private static final int RECORDKEY_UNDEFINED = -1;
@@ -40,10 +40,11 @@ public class WritableResolver extends Plugin implements ReadResolver, WriteResol
/**
- * Constructs a WritableResolver
- *
+ * Constructs a WritableResolver.
+ *
* @param input all input parameters coming from the client
- * @throws Exception
+ * @throws Exception if schema file is missing, cannot be found in
+ * classpath or fails to instantiate
*/
public WritableResolver(InputData input) throws Exception {
super(input);
@@ -77,7 +78,6 @@ public class WritableResolver extends Plugin implements ReadResolver, WriteResol
" type: " + javaType + ", " +
(isArray(javaType) ? "Array" : "Primitive") + ", " +
(isPrivate ? "Private" : "accessible") + " field");
-
}
}
}
@@ -86,12 +86,6 @@ public class WritableResolver extends Plugin implements ReadResolver, WriteResol
return (javaType.startsWith("[") && !"[B".equals(javaType));
}
- /*
- * getFields returns a list of the fields of one record.
- * Each record field is represented by a OneField item.
- * OneField item contains two fields: an integer representing the field type and a Java
- * Object representing the field value.
- */
@Override
public List<OneField> getFields(OneRow onerow) throws Exception {
userObject = onerow.getData();
@@ -168,8 +162,8 @@ public class WritableResolver extends Plugin implements ReadResolver, WriteResol
}
}
- /*
- * Sets customWritable fields and creates a OneRow object
+ /**
+ * Sets customWritable fields and creates a OneRow object.
*/
@Override
public OneRow setFields(List<OneField> record) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
index 03fe94c..c7fe103 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
@@ -26,20 +26,20 @@ import java.io.*;
import java.util.List;
/**
- * HdfsUtilities class exposes helper methods for PXF classes
+ * HdfsUtilities class exposes helper methods for PXF classes.
*/
public class HdfsUtilities {
private static Log Log = LogFactory.getLog(HdfsUtilities.class);
private static Configuration config = new Configuration();
- private static CompressionCodecFactory factory =
- new CompressionCodecFactory(config);
+ private static CompressionCodecFactory factory = new CompressionCodecFactory(
+ config);
/**
- * Hdfs data sources are absolute data paths. Method ensures
- * that dataSource begins with '/'
+ * Hdfs data sources are absolute data paths. Method ensures that dataSource
+ * begins with '/'.
*
* @param dataSource The HDFS path to a file or directory of interest.
- * Retrieved from the client request.
+ * Retrieved from the client request.
* @return an absolute data path
*/
public static String absoluteDataPath(String dataSource) {
@@ -49,20 +49,22 @@ public class HdfsUtilities {
/*
* Helper routine to get a compression codec class
*/
- private static Class<? extends CompressionCodec> getCodecClass(
- Configuration conf, String name) {
+ private static Class<? extends CompressionCodec> getCodecClass(Configuration conf,
+ String name) {
Class<? extends CompressionCodec> codecClass;
try {
- codecClass = conf.getClassByName(name).asSubclass(CompressionCodec.class);
+ codecClass = conf.getClassByName(name).asSubclass(
+ CompressionCodec.class);
} catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("Compression codec " + name + " was not found.", e);
+ throw new IllegalArgumentException("Compression codec " + name
+ + " was not found.", e);
}
return codecClass;
}
/**
- * Helper routine to get compression codec through reflection
+ * Helper routine to get compression codec through reflection.
*
* @param conf configuration used for reflection
* @param name codec name
@@ -73,7 +75,7 @@ public class HdfsUtilities {
}
/**
- * Helper routine to get compression codec class by path (file suffix)
+ * Helper routine to get compression codec class by path (file suffix).
*
* @param path path of file to get codec for
* @return matching codec class for the path. null if no codec is needed.
@@ -91,8 +93,8 @@ public class HdfsUtilities {
}
/**
- * Returns true if the needed codec is splittable.
- * If no codec is needed returns true as well.
+ * Returns true if the needed codec is splittable. If no codec is needed
+ * returns true as well.
*
* @param path path of the file to be read
* @return if the codec needed for reading the specified path is splittable.
@@ -110,30 +112,32 @@ public class HdfsUtilities {
/**
* Checks if requests should be handle in a single thread or not.
*
- * @param dataDir hdfs path to the data source
+ * @param dataDir hdfs path to the data source
* @param compCodec the fully qualified name of the compression codec
* @return if the request can be run in multi-threaded mode.
*/
public static boolean isThreadSafe(String dataDir, String compCodec) {
- Class<? extends CompressionCodec> codecClass = (compCodec != null)
- ? HdfsUtilities.getCodecClass(config, compCodec)
- : HdfsUtilities.getCodecClassByPath(dataDir);
+ Class<? extends CompressionCodec> codecClass = (compCodec != null) ? HdfsUtilities.getCodecClass(
+ config, compCodec) : HdfsUtilities.getCodecClassByPath(dataDir);
/* bzip2 codec is not thread safe */
return (codecClass == null || !BZip2Codec.class.isAssignableFrom(codecClass));
}
/**
- * Prepare byte serialization of a file split information
- * (start, length, hosts) using {@link ObjectOutputStream}.
+ * Prepares byte serialization of a file split information (start, length,
+ * hosts) using {@link ObjectOutputStream}.
*
* @param fsp file split to be serialized
* @return byte serialization of fsp
- * @throws IOException
+ * @throws IOException if I/O errors occur while writing to the underlying
+ * stream
*/
- public static byte[] prepareFragmentMetadata(FileSplit fsp) throws IOException {
+ public static byte[] prepareFragmentMetadata(FileSplit fsp)
+ throws IOException {
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
- ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
+ ObjectOutputStream objectStream = new ObjectOutputStream(
+ byteArrayStream);
objectStream.writeLong(fsp.getStart());
objectStream.writeLong(fsp.getLength());
objectStream.writeObject(fsp.getLocations());
@@ -142,7 +146,7 @@ public class HdfsUtilities {
}
/**
- * Parse fragment metadata and return matching {@link FileSplit}
+ * Parses fragment metadata and return matching {@link FileSplit}.
*
* @param inputData request input data
* @return FileSplit with fragment metadata
@@ -151,10 +155,12 @@ public class HdfsUtilities {
try {
byte[] serializedLocation = inputData.getFragmentMetadata();
if (serializedLocation == null) {
- throw new IllegalArgumentException("Missing fragment location information");
+ throw new IllegalArgumentException(
+ "Missing fragment location information");
}
- ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedLocation);
+ ByteArrayInputStream bytesStream = new ByteArrayInputStream(
+ serializedLocation);
ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
long start = objectStream.readLong();
@@ -162,44 +168,56 @@ public class HdfsUtilities {
String[] hosts = (String[]) objectStream.readObject();
- FileSplit fileSplit = new FileSplit(new Path(inputData.getDataSource()),
- start,
- end,
- hosts);
+ FileSplit fileSplit = new FileSplit(new Path(
+ inputData.getDataSource()), start, end, hosts);
- Log.debug("parsed file split: path " + inputData.getDataSource() +
- ", start " + start + ", end " + end +
- ", hosts " + ArrayUtils.toString(hosts));
+ Log.debug("parsed file split: path " + inputData.getDataSource()
+ + ", start " + start + ", end " + end + ", hosts "
+ + ArrayUtils.toString(hosts));
return fileSplit;
} catch (Exception e) {
- throw new RuntimeException("Exception while reading expected fragment metadata", e);
+ throw new RuntimeException(
+ "Exception while reading expected fragment metadata", e);
}
}
/**
- * Accessing the avro file through the "unsplittable" API just to get the schema.
- * The splittable API (AvroInputFormat) which is the one we will be using to fetch
- * the records, does not support getting the avro schema yet.
+ * Accessing the Avro file through the "unsplittable" API just to get the
+ * schema. The splittable API (AvroInputFormat) which is the one we will be
+ * using to fetch the records, does not support getting the Avro schema yet.
*
- * @param conf Hadoop configuration
+ * @param conf Hadoop configuration
* @param dataSource Avro file (i.e fileName.avro) path
* @return the Avro schema
- * @throws IOException
+ * @throws IOException if I/O error occured while accessing Avro schema file
*/
- public static Schema getAvroSchema(Configuration conf, String dataSource) throws IOException {
+ public static Schema getAvroSchema(Configuration conf, String dataSource)
+ throws IOException {
FsInput inStream = new FsInput(new Path(dataSource), conf);
DatumReader<GenericRecord> dummyReader = new GenericDatumReader<>();
- DataFileReader<GenericRecord> dummyFileReader = new DataFileReader<>(inStream, dummyReader);
- return dummyFileReader.getSchema();
+ DataFileReader<GenericRecord> dummyFileReader = new DataFileReader<>(
+ inStream, dummyReader);
+ Schema schema = dummyFileReader.getSchema();
+ dummyFileReader.close();
+ return schema;
}
+ /**
+ * Returns string serialization of list of fields. Fields of binary type
+ * (BYTEA) are converted to octal representation to make sure they will be
+ * relayed properly to the DB.
+ *
+ * @param complexRecord list of fields to be stringified
+ * @param delimiter delimiter between fields
+ * @return string of serialized fields using delimiter
+ */
public static String toString(List<OneField> complexRecord, String delimiter) {
StringBuilder buff = new StringBuilder();
String delim = ""; // first iteration has no delimiter
for (OneField complex : complexRecord) {
- if(complex.type == DataType.BYTEA.getOID()) {
+ if (complex.type == DataType.BYTEA.getOID()) {
/** Serialize byte array as string */
buff.append(delim);
Utilities.byteArrayToOctalString((byte[]) complex.val, buff);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
index 89ac86e..a88ade0 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
@@ -8,11 +8,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
-import java.io.IOException;
import java.util.List;
/**
- * Adapter used for adding a recordkey field to the records output List<OneField>
+ * Adapter used for adding a recordkey field to the records output {@code List<OneField>}.
*/
public class RecordkeyAdapter {
private Log Log;
@@ -36,7 +35,7 @@ public class RecordkeyAdapter {
private ValConverter converter = null;
/**
- * Constructs a RecordkeyAdapter
+ * Constructs a RecordkeyAdapter.
*/
public RecordkeyAdapter() {
Log = LogFactory.getLog(RecordkeyAdapter.class);
@@ -45,9 +44,9 @@ public class RecordkeyAdapter {
/**
* Adds the recordkey to the end of the passed in recFields list.
* <p>
- * This method also verifies cases in which record keys are not supported
- * by the underlying source type, and therefore "illegally" requested.
- *
+ * This method also verifies cases in which record keys are not supported
+ * by the underlying source type, and therefore "illegally" requested.
+ *
* @param recFields existing list of record (non-key) fields and their values.
* @param input all input parameters coming from the client request
* @param onerow a row object which is used here in order to find out if
@@ -55,12 +54,11 @@ public class RecordkeyAdapter {
* @return 0 if record key not needed, or 1 if record key was appended
* @throws NoSuchFieldException when the given record type does not support
* recordkeys
- * @throws IOException
*/
public int appendRecordkeyField(List<OneField> recFields,
InputData input,
- OneRow onerow) throws NoSuchFieldException, IOException {
-
+ OneRow onerow) throws NoSuchFieldException {
+
/*
* user did not request the recordkey field in the
* "create external table" statement
@@ -107,7 +105,7 @@ public class RecordkeyAdapter {
* key is already a Java primitive we returned it as is If it is an unknown
* type we throw an exception
*/
- private Object extractVal(Object key) throws IOException {
+ private Object extractVal(Object key) {
if (extractor == null) {
extractor = InitializeExtractor(key);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
index 865a547..462c7a2 100644
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
+++ b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
@@ -15,17 +15,15 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
-
/**
- * Accessor for Hive tables.
- * The accessor will open and read a split belonging to a Hive table.
- * Opening a split means creating the corresponding InputFormat and RecordReader required to access the
- * split's data. The actual record reading is done in the base class -
- * {@link com.pivotal.pxf.plugins.hdfs.HdfsSplittableDataAccessor}.
- * <p/>
- * HiveAccessor will also enforce Hive partition filtering by filtering-out a split which does not
- * belong to a partition filter. Naturally, the partition filtering will be done only for Hive tables
- * that are partitioned.
+ * Accessor for Hive tables. The accessor will open and read a split belonging
+ * to a Hive table. Opening a split means creating the corresponding InputFormat
+ * and RecordReader required to access the split's data. The actual record
+ * reading is done in the base class -
+ * {@link com.pivotal.pxf.plugins.hdfs.HdfsSplittableDataAccessor}. <br>
+ * HiveAccessor will also enforce Hive partition filtering by filtering-out a
+ * split which does not belong to a partition filter. Naturally, the partition
+ * filtering will be done only for Hive tables that are partitioned.
*/
public class HiveAccessor extends HdfsSplittableDataAccessor {
private static final Log LOG = LogFactory.getLog(HiveAccessor.class);
@@ -46,15 +44,18 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
protected Boolean filterInFragmenter = false;
/**
- * Constructs a HiveAccessor and creates an InputFormat (derived from {@link org.apache.hadoop.mapred.InputFormat})
- * and the Hive partition fields
+ * Constructs a HiveAccessor and creates an InputFormat (derived from
+ * {@link org.apache.hadoop.mapred.InputFormat}) and the Hive partition
+ * fields
*
* @param input contains the InputFormat class name and the partition fields
+ * @throws Exception if failed to create input format
*/
public HiveAccessor(InputData input) throws Exception {
/*
- * Unfortunately, Java does not allow us to call a function before calling the base constructor,
- * otherwise it would have been: super(input, createInputFormat(input))
+ * Unfortunately, Java does not allow us to call a function before
+ * calling the base constructor, otherwise it would have been:
+ * super(input, createInputFormat(input))
*/
super(input, null);
inputFormat = createInputFormat(input);
@@ -63,20 +64,21 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
/**
* Constructs a HiveAccessor
*
- * @param input contains the InputFormat class name and the partition fields
+ * @param input contains the InputFormat class name and the partition fields
* @param inputFormat Hive InputFormat
*/
- public HiveAccessor(InputData input, InputFormat<?, ?> inputFormat) throws Exception {
+ public HiveAccessor(InputData input, InputFormat<?, ?> inputFormat) {
super(input, inputFormat);
}
/**
- * openForRead
- * Enables Hive partition filtering
+ * Opens Hive data split for read. Enables Hive partition filtering. <br>
*
- * @return true if there are no partitions or there is no partition filter or
- * partition filter is set and the file currently opened by the accessor belongs
- * to the partition.
+ * @return true if there are no partitions or there is no partition filter
+ * or partition filter is set and the file currently opened by the
+ * accessor belongs to the partition.
+ * @throws Exception if filter could not be built, connection to Hive failed
+ * or resource failed to open
*/
@Override
public boolean openForRead() throws Exception {
@@ -86,27 +88,36 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
/**
* Creates the RecordReader suitable for this given split.
*
- * @param jobConf configuraton data for the Hadoop framework
- * @param split the split that was allocated for reading to this accessor
+ * @param jobConf configuration data for the Hadoop framework
+ * @param split the split that was allocated for reading to this accessor
+ * @return record reader
+ * @throws IOException if failed to create record reader
*/
@Override
- protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
+ protected Object getReader(JobConf jobConf, InputSplit split)
+ throws IOException {
return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
}
/*
- * Parse the user-data supplied by the HiveFragmenter from InputData. Based on the
- * user-data construct the partition fields and the InputFormat for current split
+ * Parses the user-data supplied by the HiveFragmenter from InputData. Based
+ * on the user-data construct the partition fields and the InputFormat for
+ * current split
*/
- private InputFormat<?, ?> createInputFormat(InputData input) throws Exception {
+ private InputFormat<?, ?> createInputFormat(InputData input)
+ throws Exception {
String userData = new String(input.getFragmentUserData());
String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
initPartitionFields(toks[3]);
filterInFragmenter = new Boolean(toks[4]);
- return HiveDataFragmenter.makeInputFormat(toks[0]/* inputFormat name */, jobConf);
+ return HiveDataFragmenter.makeInputFormat(
+ toks[0]/* inputFormat name */, jobConf);
}
- /* The partition fields are initialized one time base on userData provided by the fragmenter */
+ /*
+ * The partition fields are initialized one time base on userData provided
+ * by the fragmenter
+ */
void initPartitionFields(String partitionKeys) {
partitions = new LinkedList<HivePartition>();
if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
@@ -140,9 +151,11 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
boolean returnData = isFiltered(partitions, filter);
if (LOG.isDebugEnabled()) {
- LOG.debug("segmentId: " + inputData.getSegmentId() + " " + inputData.getDataSource() + "--" + filterStr + " returnData: " + returnData);
+ LOG.debug("segmentId: " + inputData.getSegmentId() + " "
+ + inputData.getDataSource() + "--" + filterStr
+ + " returnData: " + returnData);
if (filter instanceof List) {
- for (Object f : (List) filter) {
+ for (Object f : (List<?>) filter) {
printOneBasicFilter(f);
}
} else {
@@ -153,14 +166,16 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
return returnData;
}
- private boolean isFiltered(List<HivePartition> partitionFields, Object filter) {
+ private boolean isFiltered(List<HivePartition> partitionFields,
+ Object filter) {
if (filter instanceof List) {
/*
- * We are going over each filter in the filters list and test it against all the partition fields
- * since filters are connected only by AND operators, its enough for one filter to fail in order to
+ * We are going over each filter in the filters list and test it
+ * against all the partition fields since filters are connected only
+ * by AND operators, its enough for one filter to fail in order to
* deny this data.
*/
- for (Object f : (List) filter) {
+ for (Object f : (List<?>) filter) {
if (!testOneFilter(partitionFields, f, inputData)) {
return false;
}
@@ -172,21 +187,27 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
}
/*
- * We are testing one filter against all the partition fields.
- * The filter has the form "fieldA = valueA".
- * The partitions have the form partitionOne=valueOne/partitionTwo=ValueTwo/partitionThree=valueThree
- * 1. For a filter to match one of the partitions, lets say partitionA for example, we need:
- * fieldA = partittionOne and valueA = valueOne. If this condition occurs, we return true.
- * 2. If fieldA does not match any one of the partition fields we also return true, it means we ignore this filter
- * because it is not on a partition field.
- * 3. If fieldA = partittionOne and valueA != valueOne, then we return false.
+ * We are testing one filter against all the partition fields. The filter
+ * has the form "fieldA = valueA". The partitions have the form
+ * partitionOne=valueOne/partitionTwo=ValueTwo/partitionThree=valueThree 1.
+ * For a filter to match one of the partitions, lets say partitionA for
+ * example, we need: fieldA = partittionOne and valueA = valueOne. If this
+ * condition occurs, we return true. 2. If fieldA does not match any one of
+ * the partition fields we also return true, it means we ignore this filter
+ * because it is not on a partition field. 3. If fieldA = partittionOne and
+ * valueA != valueOne, then we return false.
*/
- private boolean testOneFilter(List<HivePartition> partitionFields, Object filter, InputData input) {
+ private boolean testOneFilter(List<HivePartition> partitionFields,
+ Object filter, InputData input) {
// Let's look first at the filter
FilterParser.BasicFilter bFilter = (FilterParser.BasicFilter) filter;
boolean isFilterOperationEqual = (bFilter.getOperation() == FilterParser.Operation.HDOP_EQ);
- if (!isFilterOperationEqual) /* in case this is not an "equality filter" we ignore it here - in partition filtering */ {
+ if (!isFilterOperationEqual) /*
+ * in case this is not an "equality filter"
+ * we ignore it here - in partition
+ * filtering
+ */{
return true;
}
@@ -197,12 +218,18 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
for (HivePartition partition : partitionFields) {
if (filterColumnName.equals(partition.name)) {
- /* the filter field matches a partition field, but the values do not match */
+ /*
+ * the filter field matches a partition field, but the values do
+ * not match
+ */
return filterValue.equals(partition.val);
}
}
- /* filter field did not match any partition field, so we ignore this filter and hence return true */
+ /*
+ * filter field did not match any partition field, so we ignore this
+ * filter and hence return true
+ */
return true;
}
@@ -211,6 +238,7 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
boolean isOperationEqual = (bFilter.getOperation() == FilterParser.Operation.HDOP_EQ);
int columnIndex = bFilter.getColumn().index();
String value = bFilter.getConstant().constant().toString();
- LOG.debug("isOperationEqual: " + isOperationEqual + " columnIndex: " + columnIndex + " value: " + value);
+ LOG.debug("isOperationEqual: " + isOperationEqual + " columnIndex: "
+ + columnIndex + " value: " + value);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
index 66fb0cb..6ebc62e 100644
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
@@ -35,8 +35,8 @@ import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
import com.pivotal.pxf.plugins.hive.utilities.HiveUtilities;
/**
- * Fragmenter class for HIVE tables
- * <p/>
+ * Fragmenter class for HIVE tables.
+ * <br>
* Given a Hive table and its partitions divide the data into fragments (here a
* data fragment is actually a HDFS file block) and return a list of them. Each
* data fragment will contain the following information:
@@ -70,7 +70,7 @@ public class HiveDataFragmenter extends Fragmenter {
private Set<String> setPartitions = new TreeSet<String>(
String.CASE_INSENSITIVE_ORDER);
- /*
+ /**
* A Hive table unit - means a subset of the HIVE table, where we can say
* that for all files in this subset, they all have the same InputFormat and
* Serde. For a partitioned table the HiveTableUnit will be one partition
@@ -101,7 +101,7 @@ public class HiveDataFragmenter extends Fragmenter {
}
/**
- * Constructs a HiveDataFragmenter object
+ * Constructs a HiveDataFragmenter object.
*
* @param inputData all input parameters coming from the client
*/
@@ -110,7 +110,7 @@ public class HiveDataFragmenter extends Fragmenter {
}
/**
- * Constructs a HiveDataFragmenter object
+ * Constructs a HiveDataFragmenter object.
*
* @param inputData all input parameters coming from the client
* @param clazz Class for JobConf
@@ -131,11 +131,12 @@ public class HiveDataFragmenter extends Fragmenter {
}
/**
- * Creates the partition InputFormat
+ * Creates the partition InputFormat.
*
* @param inputFormatName input format class name
* @param jobConf configuration data for the Hadoop framework
* @return a {@link org.apache.hadoop.mapred.InputFormat} derived object
+ * @throws Exception if failed to create input format
*/
public static InputFormat<?, ?> makeInputFormat(String inputFormatName,
JobConf jobConf)