You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by nh...@apache.org on 2015/10/08 20:37:28 UTC
[5/5] incubator-hawq git commit: HAWQ-28. JavaDoc fixes for PXF
HAWQ-28. JavaDoc fixes for PXF
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/dc115ff4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/dc115ff4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/dc115ff4
Branch: refs/heads/HAWQ-28
Commit: dc115ff4db8c2785d2060fdfc0dd5697a3de30ec
Parents: 96779dd
Author: Noa Horn <nh...@pivotal.io>
Authored: Thu Oct 8 11:37:03 2015 -0700
Committer: Noa Horn <nh...@pivotal.io>
Committed: Thu Oct 8 11:37:14 2015 -0700
----------------------------------------------------------------------
.../main/java/com/pivotal/pxf/api/Analyzer.java | 6 +-
.../java/com/pivotal/pxf/api/AnalyzerStats.java | 4 +-
.../java/com/pivotal/pxf/api/FilterParser.java | 25 +-
.../java/com/pivotal/pxf/api/Fragmenter.java | 2 +-
.../main/java/com/pivotal/pxf/api/Metadata.java | 2 +-
.../com/pivotal/pxf/api/MetadataFetcher.java | 4 +-
.../java/com/pivotal/pxf/api/ReadAccessor.java | 6 +-
.../java/com/pivotal/pxf/api/ReadResolver.java | 2 +-
.../java/com/pivotal/pxf/api/WriteAccessor.java | 6 +-
.../java/com/pivotal/pxf/api/WriteResolver.java | 2 +-
.../pxf/api/utilities/ColumnDescriptor.java | 13 +-
.../pivotal/pxf/api/utilities/InputData.java | 197 +++++---
.../pxf/plugins/hbase/HBaseFilterBuilder.java | 14 +-
.../pxf/plugins/hbase/HBaseResolver.java | 14 +-
.../hbase/utilities/HBaseColumnDescriptor.java | 12 +-
.../hbase/utilities/HBaseIntegerComparator.java | 16 +-
.../hbase/utilities/HBaseLookupTable.java | 12 +-
.../hbase/utilities/HBaseTupleDescription.java | 12 +-
.../plugins/hbase/utilities/HBaseUtilities.java | 8 +-
.../pxf/plugins/hdfs/AvroFileAccessor.java | 14 +-
.../pivotal/pxf/plugins/hdfs/AvroResolver.java | 212 +++++---
.../pivotal/pxf/plugins/hdfs/ChunkReader.java | 312 ++++++------
.../pxf/plugins/hdfs/ChunkRecordReader.java | 482 ++++++++++---------
.../pivotal/pxf/plugins/hdfs/ChunkWritable.java | 14 +-
.../pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java | 27 +-
.../plugins/hdfs/HdfsAtomicDataAccessor.java | 25 +-
.../pxf/plugins/hdfs/HdfsDataFragmenter.java | 43 +-
.../hdfs/HdfsSplittableDataAccessor.java | 81 ++--
.../pxf/plugins/hdfs/LineBreakAccessor.java | 59 +--
.../plugins/hdfs/QuotedLineBreakAccessor.java | 10 +-
.../pxf/plugins/hdfs/SequenceFileAccessor.java | 98 ++--
.../pxf/plugins/hdfs/StringPassResolver.java | 25 +-
.../pxf/plugins/hdfs/WritableResolver.java | 26 +-
.../plugins/hdfs/utilities/HdfsUtilities.java | 104 ++--
.../hdfs/utilities/RecordkeyAdapter.java | 18 +-
.../pivotal/pxf/plugins/hive/HiveAccessor.java | 124 +++--
.../pxf/plugins/hive/HiveDataFragmenter.java | 13 +-
.../pxf/plugins/hive/HiveFilterBuilder.java | 96 ++--
.../plugins/hive/HiveInputFormatFragmenter.java | 64 +--
.../pxf/plugins/hive/HiveLineBreakAccessor.java | 5 +-
.../pxf/plugins/hive/HiveRCFileAccessor.java | 5 +-
.../pivotal/pxf/plugins/hive/HiveResolver.java | 17 +-
.../plugins/hive/utilities/HiveUtilities.java | 42 +-
.../pxf/service/BridgeOutputBuilder.java | 170 ++++---
.../pivotal/pxf/service/FragmentsResponse.java | 7 +-
.../pxf/service/FragmentsResponseFormatter.java | 8 +-
.../pxf/service/MetadataResponseFormatter.java | 4 +
.../pivotal/pxf/service/io/BufferWritable.java | 58 +--
.../pivotal/pxf/service/io/GPDBWritable.java | 74 +--
.../java/com/pivotal/pxf/service/io/Text.java | 148 +++---
.../com/pivotal/pxf/service/io/Writable.java | 5 +-
.../pxf/service/rest/InvalidPathResource.java | 2 +-
.../pxf/service/rest/MetadataResource.java | 40 +-
.../pivotal/pxf/service/rest/RestResource.java | 3 +-
.../service/utilities/CustomWebappLoader.java | 54 +--
.../pxf/service/utilities/ProtocolData.java | 129 +++--
.../pxf/service/utilities/Utilities.java | 22 +-
57 files changed, 1728 insertions(+), 1269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Analyzer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Analyzer.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Analyzer.java
index c18a0da..673aed4 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Analyzer.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Analyzer.java
@@ -7,8 +7,8 @@ import com.pivotal.pxf.api.utilities.Plugin;
* Abstract class that defines getting statistics for ANALYZE.
* {@link #getEstimatedStats} returns statistics for a given path
* (block size, number of blocks, number of tuples).
- * Used when calling ANALYZE on a PXF external table, to get
- * table's statistics that are used by the optimizer to plan queries.
+ * Used when calling ANALYZE on a PXF external table, to get
+ * table's statistics that are used by the optimizer to plan queries.
*/
public abstract class Analyzer extends Plugin {
/**
@@ -28,7 +28,7 @@ public abstract class Analyzer extends Plugin {
*
* @param data the data source name (e.g, file, dir, wildcard, table name).
* @return AnalyzerStats the data statistics in json format.
- * @throws Exception
+ * @throws Exception if fails to get stats
*/
public AnalyzerStats getEstimatedStats(String data) throws Exception {
/* Return default values */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/AnalyzerStats.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/AnalyzerStats.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/AnalyzerStats.java
index d831c1e..9f9206c 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/AnalyzerStats.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/AnalyzerStats.java
@@ -44,9 +44,9 @@ public class AnalyzerStats {
*
* @param stats the data to be serialized
* @return the result in json format
- * @throws IOException
+ * @throws IOException if converting to JSON format failed
*/
- public static String dataToJSON(AnalyzerStats stats) throws IOException {
+ public static String dataToJSON(AnalyzerStats stats) throws IOException {
ObjectMapper mapper = new ObjectMapper();
// mapper serializes all members of the class by default
return "{\"PXFDataSourceStats\":" + mapper.writeValueAsString(stats) + "}";
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/FilterParser.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/FilterParser.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/FilterParser.java
index 9489601..a3803c5 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/FilterParser.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/FilterParser.java
@@ -8,19 +8,21 @@ import java.util.Stack;
* The parser code which goes over a filter string and pushes operands onto a stack.
* Once an operation is read, the evaluate function is called for the {@link FilterBuilder}
* interface with two pop-ed operands.
- *
+ * <br>
* A string of filters looks like this:
- * a2c5o1a1c"abc"o2o7
- * which means column#2 < 5 AND column#1 > "abc"
- *
+ * <code>a2c5o1a1c"abc"o2o7</code>
+ * which means {@code column#2 < 5 AND column#1 > "abc"}
+ * <br>
* It is a RPN serialized representation of a filters tree in GPDB where
- * a means an attribute (column)
- * c means a constant (either string or numeric)
- * o means operator
+ * <ul>
+ * <li> a means an attribute (column)</li>
+ * <li>c means a constant (either string or numeric)</li>
+ * <li>o means operator</li>
+ * </ul>
*
* Assuming all operators are binary, RPN representation allows it to be read left to right easily.
- *
- * FilterParser only knows about columns and constants. The rest is up to the {@link FilterBuilder} implementer. *
+ * <br>
+ * FilterParser only knows about columns and constants. The rest is up to the {@link FilterBuilder} implementer.
* FilterParser makes sure column objects are always on the left of the expression (when relevant).
*/
public class FilterParser {
@@ -55,7 +57,7 @@ public class FilterParser {
* @param left the left operand
* @param right the right operand
* @return the built filter
- * @throws Exception
+ * @throws Exception if building the filter failed
*/
public Object build(Operation operation, Object left, Object right) throws Exception;
}
@@ -125,6 +127,7 @@ public class FilterParser {
/**
* Thrown when a filter's parsing exception occurs.
*/
+ @SuppressWarnings("serial")
class FilterStringSyntaxException extends Exception {
FilterStringSyntaxException(String desc) {
super(desc + " (filter string: '" + filterString + "')");
@@ -146,7 +149,7 @@ public class FilterParser {
*
* @param filter the filter to parse
* @return the parsed filter
- * @throws Exception
+ * @throws Exception if the filter string had wrong syntax
*/
public Object parse(String filter) throws Exception {
index = 0;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Fragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Fragmenter.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Fragmenter.java
index dc7f9fb..831a7a6 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Fragmenter.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Fragmenter.java
@@ -27,7 +27,7 @@ public abstract class Fragmenter extends Plugin {
* Used to get fragments of data that could be read in parallel from the different segments.
*
* @return list of data fragments
- * @throws Exception
+ * @throws Exception if fragment list could not be retrieved
*/
public abstract List<Fragment> getFragments() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Metadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Metadata.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Metadata.java
index bf02c30..5a31360 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Metadata.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/Metadata.java
@@ -37,7 +37,7 @@ public class Metadata {
}
/**
- * Returns full table name in the form <db_name>.<table_name>
+ * Returns full table name in the form db_name.table_name
*/
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/MetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/MetadataFetcher.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/MetadataFetcher.java
index cc3ed30..5ed9b24 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/MetadataFetcher.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/MetadataFetcher.java
@@ -16,10 +16,10 @@ public abstract class MetadataFetcher {
/**
* Gets a metadata of a given table
- *
+ *
* @param tableName table name
* @return metadata of given table
- * @throws Exception
+ * @throws Exception if metadata information could not be retrieved
*/
public abstract Metadata getTableMetadata(String tableName) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadAccessor.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadAccessor.java
index 6aa42a3..85469fb 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadAccessor.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadAccessor.java
@@ -8,7 +8,7 @@ public interface ReadAccessor {
* Opens the resource for reading.
*
* @return true if the resource is successfully opened
- * @throws Exception
+ * @throws Exception if opening the resource failed
*/
boolean openForRead() throws Exception;
@@ -16,14 +16,14 @@ public interface ReadAccessor {
* Reads the next object.
*
* @return the object which was read
- * @throws Exception
+ * @throws Exception if reading from the resource failed
*/
OneRow readNextObject() throws Exception;
/**
* Closes the resource.
*
- * @throws Exception
+ * @throws Exception if closing the resource failed
*/
void closeForRead() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadResolver.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadResolver.java
index 109ec09..2fe850d 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadResolver.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/ReadResolver.java
@@ -12,7 +12,7 @@ public interface ReadResolver {
*
* @param row the row to get the fields from
* @return the {@link OneField} list of one row.
- * @throws Exception
+ * @throws Exception if decomposing the row into fields failed
*/
List<OneField> getFields(OneRow row) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteAccessor.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteAccessor.java
index da49e50..85b2535 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteAccessor.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteAccessor.java
@@ -8,7 +8,7 @@ public interface WriteAccessor {
* Opens the resource for write.
*
* @return true if the resource is successfully opened
- * @throws Exception
+ * @throws Exception if opening the resource failed
*/
boolean openForWrite() throws Exception;
@@ -17,14 +17,14 @@ public interface WriteAccessor {
*
* @param onerow the object to be written
* @return true if the write succeeded
- * @throws Exception
+ * @throws Exception writing to the resource failed
*/
boolean writeNextObject(OneRow onerow) throws Exception;
/**
* Closes the resource for write.
*
- * @throws Exception
+ * @throws Exception if closing the resource failed
*/
void closeForWrite() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteResolver.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteResolver.java
index 63dc692..1749572 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteResolver.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/WriteResolver.java
@@ -13,7 +13,7 @@ public interface WriteResolver {
*
* @param record list of {@link OneField}
* @return the constructed {@link OneRow}
- * @throws Exception
+ * @throws Exception if constructing a row from the fields failed
*/
OneRow setFields(List<OneField> record) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/ColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/ColumnDescriptor.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/ColumnDescriptor.java
index 4e02896..70b2502 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/ColumnDescriptor.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/ColumnDescriptor.java
@@ -5,13 +5,14 @@ package com.pivotal.pxf.api.utilities;
* Currently it means a name, a type id (HAWQ/GPDB OID), a type name and column index.
*/
public class ColumnDescriptor {
-
+
int gpdbColumnTypeCode;
String gpdbColumnName;
String gpdbColumnTypeName;
int gpdbColumnIndex;
+
/**
- * Reserved word for a table record key.
+ * Reserved word for a table record key.
* A field with this name will be treated as record key.
*/
public static final String RECORD_KEY_NAME = "recordkey";
@@ -59,11 +60,15 @@ public class ColumnDescriptor {
return gpdbColumnTypeName;
}
- /** Returns <tt>true</tt> if {@link #gpdbColumnName} is a {@link #RECORD_KEY_NAME}. */
+ /**
+ * Returns <tt>true</tt> if {@link #gpdbColumnName} is a {@link #RECORD_KEY_NAME}.
+ *
+ * @return whether column is a record key column
+ */
public boolean isKeyColumn() {
return RECORD_KEY_NAME.equalsIgnoreCase(gpdbColumnName);
}
-
+
@Override
public String toString() {
return "ColumnDescriptor [gpdbColumnTypeCode=" + gpdbColumnTypeCode
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/InputData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/InputData.java
index c8133c7..f7c2f78 100644
--- a/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/InputData.java
+++ b/pxf/pxf-api/src/main/java/com/pivotal/pxf/api/utilities/InputData.java
@@ -10,7 +10,7 @@ import java.util.*;
* coming from client applications, such as Hawq.
*/
public class InputData {
-
+
public static final int INVALID_SPLIT_IDX = -1;
private static final Log LOG = LogFactory.getLog(InputData.class);
@@ -32,22 +32,22 @@ public class InputData {
protected int dataFragment; /* should be deprecated */
/**
- * When false the bridge has to run in synchronized mode.
- * default value - true.
+ * When false the bridge has to run in synchronized mode. default value -
+ * true.
*/
protected boolean threadSafe;
- /**
- * The name of the recordkey column. It can appear in any location in the
- * columns list. By specifying the recordkey column, the user declares that
- * he is interested to receive for every record retrieved also the the
- * recordkey in the database. The recordkey is present in HBase table (it is
- * called rowkey), and in sequence files. When the HDFS storage element
- * queried will not have a recordkey and the user will still specify it in
- * the "create external table" statement, then the values for this field
- * will be null. This field will always be the first field in the tuple
- * returned.
- */
+ /**
+ * The name of the recordkey column. It can appear in any location in the
+ * columns list. By specifying the recordkey column, the user declares that
+ * he is interested to receive for every record retrieved also the the
+ * recordkey in the database. The recordkey is present in HBase table (it is
+ * called rowkey), and in sequence files. When the HDFS storage element
+ * queried will not have a recordkey and the user will still specify it in
+ * the "create external table" statement, then the values for this field
+ * will be null. This field will always be the first field in the tuple
+ * returned.
+ */
protected ColumnDescriptor recordkeyColumn;
/**
@@ -55,10 +55,10 @@ public class InputData {
*/
public InputData() {
}
-
+
/**
- * Constructs an InputData from a copy.
- * Used to create from an extending class.
+ * Constructs an InputData from a copy. Used to create from an extending
+ * class.
*
* @param copy the input data to copy
*/
@@ -89,122 +89,183 @@ public class InputData {
* @param userProp the lookup user property
* @return property value as a String
*/
- public String getUserProperty(String userProp) {
+ public String getUserProperty(String userProp) {
return requestParametersMap.get("X-GP-" + userProp.toUpperCase());
}
/**
- * set the byte serialization of a fragment meta data
+ * Sets the byte serialization of a fragment meta data.
+ *
* @param location start, len, and location of the fragment
*/
public void setFragmentMetadata(byte[] location) {
this.fragmentMetadata = location;
}
- /** the byte serialization of a data fragment */
+ /**
+ * The byte serialization of a data fragment.
+ *
+ * @return serialized fragment metadata
+ */
public byte[] getFragmentMetadata() {
return fragmentMetadata;
}
- /**
- * Gets any custom user data that may have been passed from the
- * fragmenter. Will mostly be used by the accessor or resolver.
+ /**
+ * Gets any custom user data that may have been passed from the fragmenter.
+ * Will mostly be used by the accessor or resolver.
+ *
+ * @return fragment user data
*/
public byte[] getFragmentUserData() {
return userData;
}
-
- /**
- * Sets any custom user data that needs to be shared across plugins.
- * Will mostly be set by the fragmenter.
+
+ /**
+ * Sets any custom user data that needs to be shared across plugins. Will
+ * mostly be set by the fragmenter.
+ *
+ * @param userData user data
*/
public void setFragmentUserData(byte[] userData) {
this.userData = userData;
}
- /** Returns the number of segments in GP. */
+ /**
+ * Returns the number of segments in HAWQ.
+ *
+ * @return number of segments
+ */
public int getTotalSegments() {
return totalSegments;
}
- /** Returns the current segment ID. */
+ /**
+ * Returns the current segment ID in HAWQ.
+ *
+ * @return current segment ID
+ */
public int getSegmentId() {
return segmentId;
}
- /** Returns true if there is a filter string to parse. */
+ /**
+ * Returns true if there is a filter string to parse.
+ *
+ * @return whether there is a filter string
+ */
public boolean hasFilter() {
return filterStringValid;
}
- /** Returns the filter string, <tt>null</tt> if #hasFilter is <tt>false</tt> */
+ /**
+ * Returns the filter string, <tt>null</tt> if #hasFilter is <tt>false</tt>.
+ *
+ * @return the filter string or null
+ */
public String getFilterString() {
return filterString;
}
- /** Returns tuple description. */
+ /**
+ * Returns tuple description.
+ *
+ * @return tuple description
+ */
public ArrayList<ColumnDescriptor> getTupleDescription() {
return tupleDescription;
}
- /** Returns the number of columns in tuple description. */
+ /**
+ * Returns the number of columns in tuple description.
+ *
+ * @return number of columns
+ */
public int getColumns() {
return tupleDescription.size();
}
- /** Returns column index from tuple description. */
+ /**
+ * Returns column index from tuple description.
+ *
+ * @param index index of column
+ * @return column by index
+ */
public ColumnDescriptor getColumn(int index) {
return tupleDescription.get(index);
}
- /**
- * Returns the column descriptor of the recordkey column. If the recordkey
- * column was not specified by the user in the create table statement will
- * return null.
- */
+ /**
+ * Returns the column descriptor of the recordkey column. If the recordkey
+ * column was not specified by the user in the create table statement will
+ * return null.
+ *
+ * @return column of record key or null
+ */
public ColumnDescriptor getRecordkeyColumn() {
return recordkeyColumn;
}
- /** Returns the data source of the required resource (i.e a file path or a table name). */
+ /**
+ * Returns the data source of the required resource (i.e a file path or a
+ * table name).
+ *
+ * @return data source
+ */
public String getDataSource() {
return dataSource;
}
- /** Sets the data source for the required resource */
+ /**
+ * Sets the data source for the required resource.
+ *
+ * @param dataSource data source to be set
+ */
public void setDataSource(String dataSource) {
this.dataSource = dataSource;
}
- /** Returns the ClassName for the java class that was defined as Accessor */
+ /**
+ * Returns the ClassName for the java class that was defined as Accessor.
+ *
+ * @return class name for Accessor
+ */
public String getAccessor() {
return accessor;
}
- /** Returns the ClassName for the java class that was defined as Resolver */
+ /**
+ * Returns the ClassName for the java class that was defined as Resolver.
+ *
+ * @return class name for Resolver
+ */
public String getResolver() {
return resolver;
}
- /**
- * Returns the ClassName for the java class that was defined as Fragmenter
- * or null if no fragmenter was defined
- */
+ /**
+ * Returns the ClassName for the java class that was defined as Fragmenter
+ * or null if no fragmenter was defined.
+ *
+ * @return class name for Fragmenter or null
+ */
public String getFragmenter() {
- return fragmenter;
+ return fragmenter;
}
- /**
- * Returns the ClassName for the java class that was defined as Analyzer or
- * null if no analyzer was defined
- */
+ /**
+ * Returns the ClassName for the java class that was defined as Analyzer or
+ * null if no analyzer was defined.
+ *
+ * @return class name for Analyzer or null
+ */
public String getAnalyzer() {
- return analyzer;
+ return analyzer;
}
/**
- * Returns the contents of pxf_remote_service_login set in Hawq.
- * Should the user set it to an empty string this function will return null.
+ * Returns the contents of pxf_remote_service_login set in Hawq. Should the
+ * user set it to an empty string this function will return null.
*
* @return remote login details if set, null otherwise
*/
@@ -213,8 +274,8 @@ public class InputData {
}
/**
- * Returns the contents of pxf_remote_service_secret set in Hawq.
- * Should the user set it to an empty string this function will return null.
+ * Returns the contents of pxf_remote_service_secret set in Hawq. Should the
+ * user set it to an empty string this function will return null.
*
* @return remote password if set, null otherwise
*/
@@ -222,16 +283,24 @@ public class InputData {
return remoteSecret;
}
+ /**
+ * Returns whether this request is thread safe.
+ * If it is not, request will be handled consequentially and not in parallel.
+ *
+ * @return whether the request is thread safe
+ */
public boolean isThreadSafe() {
return threadSafe;
}
- /**
- * Returns a data fragment index. plan to deprecate it in favor of using
- * getFragmentMetadata().
- */
- public int getDataFragment() {
- return dataFragment;
- }
+ /**
+ * Returns a data fragment index. plan to deprecate it in favor of using
+ * getFragmentMetadata().
+ *
+ * @return data fragment index
+ */
+ public int getDataFragment() {
+ return dataFragment;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseFilterBuilder.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseFilterBuilder.java
index 5bce949..82b920b 100644
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseFilterBuilder.java
+++ b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseFilterBuilder.java
@@ -45,6 +45,10 @@ public class HBaseFilterBuilder implements FilterParser.FilterBuilder {
/**
* Translates a filterString into a HBase {@link Filter} object.
+ *
+ * @param filterString filter string
+ * @return filter object
+ * @throws Exception if parsing failed
*/
public Filter getFilterObject(String filterString) throws Exception {
FilterParser parser = new FilterParser(this);
@@ -59,9 +63,11 @@ public class HBaseFilterBuilder implements FilterParser.FilterBuilder {
/**
* Returns the startKey for scanning the HBase table.
- * If the user specified a {@code > / >=} operation
+ * If the user specified a {@code > / >=} operation
* on a textual row key column, this value will be returned.
* Otherwise, the start of table.
+ *
+ * @return start key for scanning HBase table
*/
public byte[] startKey() {
return startKey;
@@ -72,6 +78,8 @@ public class HBaseFilterBuilder implements FilterParser.FilterBuilder {
* If the user specified a {@code < / <=} operation
* on a textual row key column, this value will be returned.
* Otherwise, the end of table.
+ *
+ * @return end key for scanning HBase table
*/
public byte[] endKey() {
return endKey;
@@ -87,7 +95,7 @@ public class HBaseFilterBuilder implements FilterParser.FilterBuilder {
* The only supported operation is {@code AND}. </li>
* </ol>
* <p>
- * This function is called by {@link FilterParser},
+ * This function is called by {@link FilterParser},
* each time the parser comes across an operator.
*/
@Override
@@ -138,7 +146,7 @@ public class HBaseFilterBuilder implements FilterParser.FilterBuilder {
constant.constant());
/**
- * If row key is of type TEXT, allow filter in start/stop row key API in
+ * If row key is of type TEXT, allow filter in start/stop row key API in
* HBaseAccessor/Scan object.
*/
if (textualRowKey(hbaseColumn)) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseResolver.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseResolver.java
index 3815dc4..0b4e364 100644
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseResolver.java
+++ b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/HBaseResolver.java
@@ -17,7 +17,7 @@ import java.util.List;
/**
* Record resolver for HBase.
- *
+ *
* The class is responsible to convert rows from HBase scans (returned as {@link Result} objects)
* into a List of {@link OneField} objects.
* That also includes the conversion process of each HBase column's value into its HAWQ assigned type.
@@ -26,9 +26,11 @@ import java.util.List;
*/
public class HBaseResolver extends Plugin implements ReadResolver {
private HBaseTupleDescription tupleDescription;
-
+
/**
* Constructs a resolver and initializes the table's tuple description.
+ *
+ * @param input query information, contains HBase table name and filter
*/
public HBaseResolver(InputData input) {
super(input);
@@ -37,9 +39,9 @@ public class HBaseResolver extends Plugin implements ReadResolver {
/**
* Splits an HBase {@link Result} object into a list of {@link OneField},
- * based on the table's tuple description.
+ * based on the table's tuple description.
* Each field is converted from HBase bytes into its column description type.
- *
+ *
* @return list of fields
*/
@Override
@@ -70,7 +72,7 @@ public class HBaseResolver extends Plugin implements ReadResolver {
/**
* Converts given byte array value to the matching java object, according to
* the given type code.
- *
+ *
* @param typeCode ColumnDescriptor type id
* @param typeName type name. Used for error messages
* @param val value to be converted
@@ -127,7 +129,7 @@ public class HBaseResolver extends Plugin implements ReadResolver {
/**
* Returns the value of a column from a Result object.
- *
+ *
* @param result HBase table row
* @param column HBase column to be retrieved
* @return HBase column value
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
index 3057d72..cf5a897 100644
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
+++ b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
@@ -14,6 +14,8 @@ public class HBaseColumnDescriptor extends ColumnDescriptor {
/**
* Constructs a column descriptor using the given copy's column name.
+ *
+ * @param copy column descriptor to be copied
*/
public HBaseColumnDescriptor(ColumnDescriptor copy) {
this(copy, copy.columnName().getBytes());
@@ -28,9 +30,9 @@ public class HBaseColumnDescriptor extends ColumnDescriptor {
* <li>recordkey - Row key column (case insensitive).</li>
* </ol>
* <p>
- * For recordkey, no HBase name is created.
- *
- * @param copy column descriptor
+ * For recordkey, no HBase name is created.
+ *
+ * @param copy column descriptor
* @param newColumnName HBase column name - can be different than the given column descriptor name.
*/
public HBaseColumnDescriptor(ColumnDescriptor copy, byte[] newColumnName) {
@@ -49,6 +51,8 @@ public class HBaseColumnDescriptor extends ColumnDescriptor {
/**
* Returns the family column name.
* (E.g. "cf1:q2" will return "cf1")
+ *
+ * @return family column name
*/
public byte[] columnFamilyBytes() {
return columnFamily;
@@ -57,6 +61,8 @@ public class HBaseColumnDescriptor extends ColumnDescriptor {
/**
* Returns the qualifier column name.
* (E.g. "cf1:q2" will return "q2")
+ *
+ * @return qualifier column name
*/
public byte[] qualifierBytes() {
return qualifier;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
index 6a9cc4f..a935534 100644
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
+++ b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
@@ -14,7 +14,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
* <p>
* To use with HBase it must reside in the classpath of every region server.
* <p>
- * It converts a value into {@link Long} before comparing.
+ * It converts a value into {@link Long} before comparing.
* The filter is good for any integer numeric comparison i.e. integer, bigint, smallint.
* <p>
* according to HBase 0.96 requirements, this must serialized using Protocol Buffers
@@ -25,7 +25,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class HBaseIntegerComparator extends ByteArrayComparable {
private Long val;
-
+
public HBaseIntegerComparator(Long inVal) {
super(Bytes.toBytes(inVal));
this.val = inVal;
@@ -33,6 +33,10 @@ public class HBaseIntegerComparator extends ByteArrayComparable {
/**
* The comparison function. Currently uses {@link Long#parseLong(String)}.
+ *
+ * @return 0 if equal;
+ * a value less than 0 if row value is less than filter value;
+ * and a value greater than 0 if the row value is greater than the filter value.
*/
@Override
public int compareTo(byte[] value, int offset, int length) {
@@ -42,7 +46,7 @@ public class HBaseIntegerComparator extends ByteArrayComparable {
if (length == 0)
return 1; // empty line, can't compare.
- /**
+ /**
* TODO optimize by parsing the bytes directly.
* Maybe we can even determine if it is an int or a string encoded.
*/
@@ -53,6 +57,8 @@ public class HBaseIntegerComparator extends ByteArrayComparable {
/**
* Returns the comparator serialized using Protocol Buffers.
+ *
+ * @return serialized comparator
*/
@Override
public byte[] toByteArray() {
@@ -64,12 +70,12 @@ public class HBaseIntegerComparator extends ByteArrayComparable {
/**
* Hides ("overrides") a static method in {@link ByteArrayComparable}.
* This method will be called in deserialization.
- *
+ *
* @param pbBytes
* A pb serialized instance
* @return An instance of {@link HBaseIntegerComparator} made from
* <code>bytes</code>
- * @throws DeserializationException
+ * @throws DeserializationException if deserialization of bytes to Protocol Buffers failed
* @see #toByteArray
*/
public static ByteArrayComparable parseFrom(final byte[] pbBytes)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseLookupTable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseLookupTable.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseLookupTable.java
index b522479..459e97c 100644
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseLookupTable.java
+++ b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseLookupTable.java
@@ -19,8 +19,9 @@ import java.util.Map;
* HBaseLookupTable will load a table's lookup information
* from HBase pxflookup table if exists.<br>
* This table holds mappings between HAWQ column names (key) and HBase column names (value).<br>
- * E.g. for an HBase table "hbase_table", mappings between HAWQ column names and HBase column name
- * "hawq1" -> "cf1:hbase1" and "hawq2" -> "cf1:hbase2" will be:<br>
+ * E.g. for an HBase table "hbase_table", mappings between HAWQ column names and HBase column names,
+ * when <code>"hawq1"</code> is mapped to <code>"cf1:hbase1"</code> and
+ * <code>"hawq2"</code> is mapped to <code>"cf1:hbase2"</code>, will be:<br>
* <pre>
* ROW COLUMN+CELL
* hbase_table column=mapping:hawq1, value=cf1:hbase1
@@ -47,6 +48,7 @@ public class HBaseLookupTable implements Closeable {
* Constructs a connector to HBase lookup table.
* Requires calling {@link #close()} to close {@link HBaseAdmin} instance.
*
+ * @param conf HBase configuration
* @throws IOException when initializing HBaseAdmin fails
*/
public HBaseLookupTable(Configuration conf) throws Exception {
@@ -93,6 +95,8 @@ public class HBaseLookupTable implements Closeable {
/**
* Returns true if {@link #LOOKUPTABLENAME} is available and enabled.
+ *
+ * @return whether lookup table is valid
*/
private boolean lookupTableValid() throws IOException {
return (HBaseUtilities.isTableAvailable(admin, LOOKUPTABLENAME) &&
@@ -101,6 +105,8 @@ public class HBaseLookupTable implements Closeable {
/**
* Returns true if {@link #LOOKUPTABLENAME} has {@value #LOOKUPCOLUMNFAMILY} family.
+ *
+ * @return whether lookup has expected column family name
*/
private boolean lookupHasCorrectStructure() throws IOException {
HTableDescriptor htd = admin.getTableDescriptor(TableName.valueOf(LOOKUPTABLENAME));
@@ -109,6 +115,8 @@ public class HBaseLookupTable implements Closeable {
/**
* Loads table name mappings from {@link #LOOKUPTABLENAME} lookup table.
+ *
+ * @param tableName table name
*/
private void loadTableMappings(String tableName) throws IOException {
openLookupTable();
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseTupleDescription.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
index e91ff8c..74f3ec3 100644
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
+++ b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
@@ -23,7 +23,7 @@ public class HBaseTupleDescription {
/**
* Constructs tuple description of the HBase table.
- *
+ *
* @param conf data containing table tuple description
*/
public HBaseTupleDescription(InputData conf) {
@@ -33,6 +33,8 @@ public class HBaseTupleDescription {
/**
* Returns the number of fields.
+ *
+ * @return number of fields
*/
public int columns() {
return tupleDescription.size();
@@ -40,7 +42,7 @@ public class HBaseTupleDescription {
/**
* Returns the column description of index column.
- *
+ *
* @param index column index to be returned
* @return column description
*/
@@ -57,7 +59,7 @@ public class HBaseTupleDescription {
/**
* Loads user information from fragmenter.
* The data contains optional table mappings from the lookup table,
- * between field names in HAWQ table and in the HBase table.
+ * between field names in HAWQ table and in the HBase table.
*/
@SuppressWarnings("unchecked")
private void loadUserData() {
@@ -87,7 +89,7 @@ public class HBaseTupleDescription {
/**
* Returns the {@link #HBaseColumnDescriptor} for given column.
* If the column has a lookup table mapping, the HBase column name is used.
- *
+ *
* @param column HAWQ column description
* @return matching HBase column description
*/
@@ -108,7 +110,7 @@ public class HBaseTupleDescription {
/**
* Returns the HBase name mapping for the given column name.
- *
+ *
* @param column HAWQ column description
* @return HBase name for the column
*/
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseUtilities.java b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseUtilities.java
index 3edf07d..b338c02 100644
--- a/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseUtilities.java
+++ b/pxf/pxf-hbase/src/main/java/com/pivotal/pxf/plugins/hbase/utilities/HBaseUtilities.java
@@ -31,7 +31,7 @@ public class HBaseUtilities {
* @param hbaseAdmin HBase admin, must be initialized
* @param tableName table name
* @return true if table exists
- * @throws IOException
+ * @throws IOException if a remote or network exception occurs when connecting to HBase
*/
public static boolean isTableAvailable(Admin hbaseAdmin, String tableName) throws IOException {
TableName name = TableName.valueOf(tableName);
@@ -42,9 +42,9 @@ public class HBaseUtilities {
/**
* Closes HBase admin and connection if they are open.
*
- * @param hbaseAdmin
- * @param hbaseConnection
- * @throws IOException
+ * @param hbaseAdmin HBase admin
+ * @param hbaseConnection HBase connection
+ * @throws IOException if an I/O error occurs when connecting to HBase
*/
public static void closeConnection(Admin hbaseAdmin, Connection hbaseConnection) throws IOException {
if (hbaseAdmin != null) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
index a496698..1a667f4 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
@@ -2,6 +2,7 @@ package com.pivotal.pxf.plugins.hdfs;
import com.pivotal.pxf.api.OneRow;
import com.pivotal.pxf.api.utilities.InputData;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.*;
@@ -21,10 +22,11 @@ public class AvroFileAccessor extends HdfsSplittableDataAccessor {
private AvroWrapper<GenericRecord> avroWrapper = null;
/**
- * Constructs a AvroFileAccessor that creates the job configuration and
+ * Constructs a AvroFileAccessor that creates the job configuration and
* accesses the avro file to fetch the avro schema
- *
+ *
* @param input all input parameters coming from the client
+ * @throws Exception if getting the avro schema fails
*/
public AvroFileAccessor(InputData input) throws Exception {
// 1. Call the base class
@@ -44,15 +46,15 @@ public class AvroFileAccessor extends HdfsSplittableDataAccessor {
@Override
protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
- return new AvroRecordReader(jobConf, (FileSplit) split);
+ return new AvroRecordReader<Object>(jobConf, (FileSplit) split);
}
/**
* readNextObject
- * The AVRO accessor is currently the only specialized accessor that
- * overrides this method. This happens, because the special
+ * The AVRO accessor is currently the only specialized accessor that
+ * overrides this method. This happens, because the special
* AvroRecordReader.next() semantics (use of the AvroWrapper), so it
- * cannot use the RecordReader's default implementation in
+ * cannot use the RecordReader's default implementation in
* SplittableFileAccessor
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
index 3560404..8b86479 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
@@ -9,6 +9,7 @@ import com.pivotal.pxf.api.utilities.Plugin;
import com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException;
import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
import com.pivotal.pxf.plugins.hdfs.utilities.RecordkeyAdapter;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -38,7 +39,8 @@ import static com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema
public class AvroResolver extends Plugin implements ReadResolver {
private GenericRecord avroRecord = null;
private DatumReader<GenericRecord> reader = null;
- private BinaryDecoder decoder = null; // member kept to enable reuse, and thus avoid repeated allocation
+ // member kept to enable reuse, and thus avoid repeated allocation
+ private BinaryDecoder decoder = null;
private List<Schema.Field> fields = null;
private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter();
private static final String MAPKEY_DELIM = ":";
@@ -49,19 +51,19 @@ public class AvroResolver extends Plugin implements ReadResolver {
private String recordkeyDelim;
/**
- * Constructs an AvroResolver. Initializes Avro data structure: the avro
- * record - fields information and the avro record reader. All Avro data is
+ * Constructs an AvroResolver. Initializes Avro data structure: the Avro
+ * record - fields information and the Avro record reader. All Avro data is
* build from the Avro schema, which is based on the *.avsc file that was
* passed by the user
*
* @param input all input parameters coming from the client
- * @throws IOException
+ * @throws IOException if Avro schema could not be retrieved or parsed
*/
public AvroResolver(InputData input) throws IOException {
super(input);
- Schema schema = isAvroFile()
- ? getAvroSchema(new Configuration(), input.getDataSource())
+ Schema schema = isAvroFile() ? getAvroSchema(new Configuration(),
+ input.getDataSource())
: (new Schema.Parser()).parse(openExternalSchema());
reader = new GenericDatumReader<>(schema);
@@ -75,56 +77,66 @@ public class AvroResolver extends Plugin implements ReadResolver {
: input.getUserProperty("RECORDKEY_DELIM");
}
- /*
- * getFields returns a list of the fields of one record. Each record field
- * is represented by a OneField item. OneField item contains two fields:
- * an integer representing the field type and a Java Object representing
- * the field value.
+ /**
+ * Returns a list of the fields of one record. Each record field is
+ * represented by a OneField item. OneField item contains two fields: an
+ * integer representing the field type and a Java Object representing the
+ * field value.
*/
@Override
public List<OneField> getFields(OneRow row) throws Exception {
avroRecord = makeAvroRecord(row.getData(), avroRecord);
List<OneField> record = new LinkedList<OneField>();
- int recordkeyIndex = (inputData.getRecordkeyColumn() == null) ? -1 :
- inputData.getRecordkeyColumn().columnIndex();
+ int recordkeyIndex = (inputData.getRecordkeyColumn() == null) ? -1
+ : inputData.getRecordkeyColumn().columnIndex();
int currentIndex = 0;
for (Schema.Field field : fields) {
/*
* Add the record key if exists
- */
+ */
if (currentIndex == recordkeyIndex) {
- currentIndex += recordkeyAdapter.appendRecordkeyField(record, inputData, row);
+ currentIndex += recordkeyAdapter.appendRecordkeyField(record,
+ inputData, row);
}
- currentIndex += populateRecord(record, avroRecord.get(field.name()), field.schema());
+ currentIndex += populateRecord(record,
+ avroRecord.get(field.name()), field.schema());
}
return record;
}
- /*
- * Test if the Avro records are residing inside an AVRO file.
- * If the Avro records are not residing inside an AVRO file, then
- * they may reside inside a sequence file, regular file, ...
+ /**
+ * Tests if the Avro records are residing inside an AVRO file. If the Avro
+ * records are not residing inside an AVRO file, then they may reside inside
+ * a sequence file, regular file, ...
+ *
+ * @return whether the resource is an Avro file
*/
boolean isAvroFile() {
return inputData.getAccessor().toLowerCase().contains("avro");
}
- /*
+ /**
* The record can arrive from one out of two different sources: a sequence
* file or an AVRO file. If it comes from an AVRO file, then it was already
- * obtained as a GenericRecord when when it was fetched from the file with
- * the AvroRecorReader so in this case a cast is enough. On the other hand,
- * if the source is a sequence file, then the input parameter obj hides a
- * bytes [] buffer which is in fact one Avro record serialized. Here, we
- * build the Avro record from the flat buffer, using the AVRO API. Then
- * (for both cases) in the remaining functions we build a List<OneField>
- * record from the Avro record
+ * obtained as a {@link GenericRecord} when when it was fetched from the
+ * file with the {@link AvroRecorReader} so in this case a cast is enough.
+ * On the other hand, if the source is a sequence file, then the input
+ * parameter obj hides a bytes [] buffer which is in fact one Avro record
+ * serialized. Here, we build the Avro record from the flat buffer, using
+ * the AVRO API. Then (for both cases) in the remaining functions we build a
+ * {@code List<OneField>} record from the Avro record.
+ *
+ * @param obj object holding an Avro record
+ * @param reuseRecord Avro record to be reused to create new record from obj
+ * @return Avro record
+ * @throws IOException if creating the Avro record from byte array failed
*/
- GenericRecord makeAvroRecord(Object obj, GenericRecord reuseRecord) throws IOException {
+ GenericRecord makeAvroRecord(Object obj, GenericRecord reuseRecord)
+ throws IOException {
if (isAvroFile()) {
return (GenericRecord) obj;
} else {
@@ -134,12 +146,18 @@ public class AvroResolver extends Plugin implements ReadResolver {
}
}
- /*
+ /**
* For a given field in the Avro record we extract its value and insert it
- * into the output List<OneField> record. An Avro field can be a primitive
- * type or an array type.
+ * into the output {@code List<OneField>} record. An Avro field can be a
+ * primitive type or an array type.
+ *
+ * @param record list of fields to be populated
+ * @param fieldValue field value
+ * @param fieldSchema field schema
+ * @return the number of populated fields
*/
- int populateRecord(List<OneField> record, Object fieldValue, Schema fieldSchema) throws IllegalAccessException {
+ int populateRecord(List<OneField> record, Object fieldValue,
+ Schema fieldSchema) {
Schema.Type fieldType = fieldSchema.getType();
int ret = 0;
@@ -149,28 +167,38 @@ public class AvroResolver extends Plugin implements ReadResolver {
case ARRAY:
List<OneField> listRecord = new LinkedList<>();
ret = setArrayField(listRecord, fieldValue, fieldSchema);
- addOneFieldToRecord(record, TEXT, String.format("[%s]", HdfsUtilities.toString(listRecord, collectionDelim)));
+ addOneFieldToRecord(record, TEXT, String.format("[%s]",
+ HdfsUtilities.toString(listRecord, collectionDelim)));
break;
case MAP:
List<OneField> mapRecord = new LinkedList<>();
ret = setMapField(mapRecord, fieldValue, fieldSchema);
- addOneFieldToRecord(record, TEXT, String.format("{%s}", HdfsUtilities.toString(mapRecord, collectionDelim)));
+ addOneFieldToRecord(record, TEXT, String.format("{%s}",
+ HdfsUtilities.toString(mapRecord, collectionDelim)));
break;
case RECORD:
List<OneField> recRecord = new LinkedList<>();
ret = setRecordField(recRecord, fieldValue, fieldSchema);
- addOneFieldToRecord(record, TEXT, String.format("{%s}", HdfsUtilities.toString(recRecord, collectionDelim)));
+ addOneFieldToRecord(record, TEXT, String.format("{%s}",
+ HdfsUtilities.toString(recRecord, collectionDelim)));
break;
case UNION:
- /* When an Avro field is actually a union, we resolve the type of the union
- * element, and delegate the record update via recursion
- */
- int unionIndex = GenericData.get().resolveUnion(fieldSchema, fieldValue);
- /** Retrieve index of the non null data type from the type array if value is null */
+ /*
+ * When an Avro field is actually a union, we resolve the type
+ * of the union element, and delegate the record update via
+ * recursion
+ */
+ int unionIndex = GenericData.get().resolveUnion(fieldSchema,
+ fieldValue);
+ /**
+ * Retrieve index of the non null data type from the type array
+ * if value is null
+ */
if (fieldValue == null) {
unionIndex ^= 1;
}
- ret = populateRecord(record, fieldValue, fieldSchema.getTypes().get(unionIndex));
+ ret = populateRecord(record, fieldValue,
+ fieldSchema.getTypes().get(unionIndex));
break;
case ENUM:
value = (fieldValue != null) ? fieldValue : null;
@@ -185,7 +213,8 @@ public class AvroResolver extends Plugin implements ReadResolver {
ret = addOneFieldToRecord(record, FLOAT8, value);
break;
case STRING:
- value = (fieldValue != null) ? String.format("%s", fieldValue) : null;
+ value = (fieldValue != null) ? String.format("%s", fieldValue)
+ : null;
ret = addOneFieldToRecord(record, TEXT, value);
break;
case FLOAT:
@@ -214,59 +243,77 @@ public class AvroResolver extends Plugin implements ReadResolver {
return ret;
}
- /*
+ /**
* When an Avro field is actually a record, we iterate through each field
* for each entry, the field name and value are added to a local record
- * List<OneField> complexRecord with the necessary delimiter
- * we create an object of type OneField and insert it into the output List<OneField> record
- *
+ * {@code List<OneField>} complexRecord with the necessary delimiter we
+ * create an object of type OneField and insert it into the output
+ * {@code List<OneField>} record.
+ *
+ * @param record list of fields to be populated
+ * @param value field value
+ * @param recSchema record schema
+ * @return number of populated fields
*/
- int setRecordField(List<OneField> record, Object value, Schema recSchema) throws IllegalAccessException {
+ int setRecordField(List<OneField> record, Object value, Schema recSchema) {
- GenericRecord rec = ((GenericData.Record)value);
+ GenericRecord rec = ((GenericData.Record) value);
Schema fieldKeySchema = Schema.create(Schema.Type.STRING);
int currentIndex = 0;
- for(Schema.Field field: recSchema.getFields()) {
+ for (Schema.Field field : recSchema.getFields()) {
Schema fieldSchema = field.schema();
- Schema.Type fieldType = fieldSchema.getType();
Object fieldValue = rec.get(field.name());
List<OneField> complexRecord = new LinkedList<>();
populateRecord(complexRecord, field.name(), fieldKeySchema);
populateRecord(complexRecord, fieldValue, fieldSchema);
- addOneFieldToRecord(record, TEXT, HdfsUtilities.toString(complexRecord, recordkeyDelim));
+ addOneFieldToRecord(record, TEXT,
+ HdfsUtilities.toString(complexRecord, recordkeyDelim));
currentIndex++;
}
return currentIndex;
}
- /*
- * When an Avro field is actually a map, we resolve the type of the map value
- * For each entry, the field name and value are added to a local record
- * we create an object of type OneField and insert it into the output List<OneField> record
+ /**
+ * When an Avro field is actually a map, we resolve the type of the map
+ * value For each entry, the field name and value are added to a local
+ * record we create an object of type OneField and insert it into the output
+ * {@code List<OneField>} record.
+ *
* Unchecked warning is suppressed to enable us to cast fieldValue to a Map.
* (since the value schema has been identified to me of type map)
+ *
+ * @param record list of fields to be populated
+ * @param fieldValue field value
+ * @param mapSchema map schema
+ * @return number of populated fields
*/
@SuppressWarnings("unchecked")
- int setMapField(List<OneField> record, Object fieldValue, Schema mapSchema) throws IllegalAccessException {
+ int setMapField(List<OneField> record, Object fieldValue, Schema mapSchema) {
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = mapSchema.getValueType();
- Map<String, ?> avroMap = ((Map<String, ?>)fieldValue);
+ Map<String, ?> avroMap = ((Map<String, ?>) fieldValue);
for (Map.Entry<String, ?> entry : avroMap.entrySet()) {
List<OneField> complexRecord = new LinkedList<>();
populateRecord(complexRecord, entry.getKey(), keySchema);
populateRecord(complexRecord, entry.getValue(), valueSchema);
- addOneFieldToRecord(record, TEXT, HdfsUtilities.toString(complexRecord, mapkeyDelim));
+ addOneFieldToRecord(record, TEXT,
+ HdfsUtilities.toString(complexRecord, mapkeyDelim));
}
return avroMap.size();
}
- /*
+ /**
* When an Avro field is actually an array, we resolve the type of the array
* element, and for each element in the Avro array, we recursively invoke
- * the population of List<OneField> record
+ * the population of {@code List<OneField>} record.
*
+ * @param record list of fields to be populated
+ * @param fieldValue field value
+ * @param arraySchema array schema
+ * @return number of populated fields
*/
- int setArrayField(List<OneField> record, Object fieldValue, Schema arraySchema) throws IllegalAccessException {
+ int setArrayField(List<OneField> record, Object fieldValue,
+ Schema arraySchema) {
Schema typeSchema = arraySchema.getElementType();
GenericData.Array<?> array = (GenericData.Array<?>) fieldValue;
int length = array.size();
@@ -276,21 +323,30 @@ public class AvroResolver extends Plugin implements ReadResolver {
return length;
}
- /*
- * Creates the OneField object and adds it to the output List<OneField> record.
- * Strings and byte arrays are held inside special types in the Avro record so we
- * transfer them to standard types in order to enable their insertion in the
- * GPDBWritable instance
+ /**
+ * Creates the {@link OneField} object and adds it to the output {@code List<OneField>}
+ * record. Strings and byte arrays are held inside special types in the Avro
+ * record so we transfer them to standard types in order to enable their
+ * insertion in the GPDBWritable instance.
+ *
+ * @param record list of fields to be populated
+ * @param gpdbWritableType field type
+ * @param val field value
+ * @return 1 (number of populated fields)
*/
- int addOneFieldToRecord(List<OneField> record, DataType gpdbWritableType, Object val) {
+ int addOneFieldToRecord(List<OneField> record, DataType gpdbWritableType,
+ Object val) {
OneField oneField = new OneField();
oneField.type = gpdbWritableType.getOID();
switch (gpdbWritableType) {
case BYTEA:
- if(val instanceof ByteBuffer) {
+ if (val instanceof ByteBuffer) {
oneField.val = ((ByteBuffer) val).array();
} else {
- /** Entry point when the underlying bytearray is from a Fixed data */
+ /**
+ * Entry point when the underlying bytearray is from a Fixed
+ * data
+ */
oneField.val = ((GenericData.Fixed) val).bytes();
}
break;
@@ -303,13 +359,23 @@ public class AvroResolver extends Plugin implements ReadResolver {
return 1;
}
- InputStream openExternalSchema() throws IOException {
+ /**
+ * Opens Avro schema based on DATA-SCHEMA parameter.
+ *
+ * @return InputStream of schema file
+ * @throws DataSchemaException if schema file could not be opened
+ */
+ InputStream openExternalSchema() {
String schemaName = inputData.getUserProperty("DATA-SCHEMA");
- /** Testing that the schema name was supplied by the user - schema is an optional properly. */
+ /**
+ * Testing that the schema name was supplied by the user - schema is an
+ * optional properly.
+ */
if (schemaName == null) {
- throw new DataSchemaException(SCHEMA_NOT_INDICATED, this.getClass().getName());
+ throw new DataSchemaException(SCHEMA_NOT_INDICATED,
+ this.getClass().getName());
}
/** Testing that the schema resource exists. */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
index 897085c..1873d7d 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
@@ -6,163 +6,171 @@ import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import com.pivotal.pxf.plugins.hdfs.ChunkWritable;
/**
- * A class that provides a line reader from an input stream.
- * Lines are terminated by '\n' (LF)
- * EOF also terminates an otherwise unterminated line.
+ * A class that provides a line reader from an input stream. Lines are
+ * terminated by '\n' (LF) EOF also terminates an otherwise unterminated line.
*/
public class ChunkReader implements Closeable {
- public static final int DEFAULT_BUFFER_SIZE = 64*1024;
- private int bufferSize = DEFAULT_BUFFER_SIZE;
- private InputStream in;
- private byte[] buffer;
- // the number of bytes of real data in the buffer
- private int bufferLength = 0;
- // the current position in the buffer
- private int bufferPosn = 0;
- private static final byte LF = '\n';
-
- /**
- * Constructs a ChunkReader instance
- * @param in input stream
- */
- public ChunkReader(InputStream in) throws IOException {
- this.in = in;
- this.buffer = new byte[this.bufferSize];
- }
-
-
- /**
- * Close the underlying stream.
- */
- @Override
- public void close() throws IOException {
- in.close();
- }
-
- /*
- * Internal class used for holding part of a chunk brought by one read()
- * operation on the input stream. We collect seveal such nodes in a list
- * by doing several read operation until we reach the chunk size - maxBytesToConsume
- */
- private class Node {
- /* part of a chunk brought in a single inputstream.read() operation */
- public byte [] slice;
- /* the size of the slice */
- public int len;
- }
-
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private InputStream in;
+ private byte[] buffer;
+ // the number of bytes of real data in the buffer
+ private int bufferLength = 0;
+ // the current position in the buffer
+ private int bufferPosn = 0;
+ private static final byte LF = '\n';
+
+ /**
+ * Constructs a ChunkReader instance
+ *
+ * @param in input stream
+ */
+ public ChunkReader(InputStream in) {
+ this.in = in;
+ this.buffer = new byte[this.bufferSize];
+ }
+
+ /**
+ * Closes the underlying stream.
+ */
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /*
+ * Internal class used for holding part of a chunk brought by one read()
+ * operation on the input stream. We collect several such nodes in a list by
+ * doing several read operation until we reach the chunk size -
+ * maxBytesToConsume
+ */
+ private class Node {
+ /* part of a chunk brought in a single inputstream.read() operation */
+ public byte[] slice;
+ /* the size of the slice */
+ public int len;
+ }
+
+ /**
+ * Reads data in chunks of DEFAULT_CHUNK_SIZE, until we reach
+ * maxBytesToConsume.
+ *
+ * @param str - output parameter, will contain the read chunk byte array
+ * @param maxBytesToConsume - requested chunk size
+ * @return actual chunk size
+ * @throws IOException if the first byte cannot be read for any reason
+ * other than the end of the file, if the input stream has been closed,
+ * or if some other I/O error occurs.
+ */
+ public int readChunk(Writable str, int maxBytesToConsume) throws IOException
+ {
+ ChunkWritable cw = (ChunkWritable) str;
+ List<Node> list = new LinkedList<Node>();
+
+ long bytesConsumed = 0;
+
+ do {
+ if (bufferLength > 0) {
+ int remaining = bufferLength - bufferPosn;
+ Node nd = new Node();
+ nd.slice = new byte[remaining];
+ nd.len = remaining;
+ System.arraycopy(buffer, bufferPosn, nd.slice, 0, nd.len);
+ list.add(nd);
+ bytesConsumed += nd.len;
+ } else {
+ Node nd = new Node();
+ nd.slice = new byte[buffer.length];
+ nd.len = in.read(nd.slice);
+ if (nd.len <= 0) {
+ break; // EOF
+ }
+ bytesConsumed += nd.len;
+ list.add(nd);
+ }
+
+ bufferLength = bufferPosn = 0;
+
+ } while (bytesConsumed < maxBytesToConsume);
+
+ if (list.size() > 0) {
+ cw.box = new byte[(int) bytesConsumed];
+ int pos = 0;
+ for (int i = 0; i < list.size(); i++) {
+ Node n = list.get(i);
+ System.arraycopy(n.slice, 0, cw.box, pos, n.len);
+ pos += n.len;
+ }
+ }
+
+ return (int) bytesConsumed;
+ }
+
/**
- * Read data in chunks of DEFAULT_CHUNK_SIZE, until we reach maxBytesToConsume
- * @param str - output parameter, will contain the read chunk byte array
- * @param maxBytesToConsume - requested chunk size
- * @return actual chunk size
- */
- public int readChunk(Writable str, int maxBytesToConsume) throws IOException {
- ChunkWritable cw = (ChunkWritable)str;
- List<Node> list = new LinkedList<Node>();
-
- long bytesConsumed = 0;
-
- do {
- if (bufferLength > 0) {
- int remaining = bufferLength - bufferPosn;
- Node nd = new Node();
- nd.slice = new byte[remaining];
- nd.len = remaining;
- System.arraycopy(buffer, bufferPosn, nd.slice, 0, nd.len);
- list.add(nd);
- bytesConsumed += nd.len;
- } else {
- Node nd = new Node();
- nd.slice = new byte[buffer.length];
- nd.len = in.read(nd.slice);
- if (nd.len <= 0) {
- break; // EOF
- }
- bytesConsumed += nd.len;
- list.add(nd);
- }
-
- bufferLength = bufferPosn = 0;
-
- } while (bytesConsumed < maxBytesToConsume);
-
- if (list.size() > 0) {
- cw.box = new byte[(int)bytesConsumed];
- int pos = 0;
- for (int i = 0; i < list.size(); i++) {
- Node n = list.get(i);
- System.arraycopy(n.slice, 0, cw.box, pos, n.len);
- pos += n.len;
- }
- }
-
- return (int)bytesConsumed;
- }
-
- /**
- * Read a line terminated by LF.
- * @param str - output parameter, will contain the read record
- * @param maxBytesToConsume - the line mustn't exceed this value
- * @return length of the line read
- */
- public int readLine(Writable str, int maxBytesToConsume) throws IOException {
- ChunkWritable cw = (ChunkWritable)str;
- List<Node> list = new LinkedList<Node>();
-
- boolean newLine = false; //length of terminating newline
- long bytesConsumed = 0;
-
- do
- {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
-
- bufferLength = in.read(buffer);
- if (bufferLength <= 0) {
- break; // EOF
- }
- }
-
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newLine = true;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- }
-
- int readLength = bufferPosn - startPosn;
- bytesConsumed += readLength;
-
- if (readLength > 0) {
- Node nd = new Node();
- nd.slice = new byte[readLength];
- nd.len = readLength;
- System.arraycopy(buffer, startPosn, nd.slice, 0, nd.len);
- list.add(nd);
- }
- } while (!newLine && bytesConsumed < maxBytesToConsume);
-
- if (list.size() > 0) {
- cw.box = new byte[(int)bytesConsumed];
- int pos = 0;
- for (int i = 0; i < list.size(); i++)
- {
- Node n = list.get(i);
- System.arraycopy(n.slice, 0, cw.box, pos, n.len);
- pos += n.len;
- }
- }
-
- return (int)bytesConsumed;
- }
+ * Reads a line terminated by LF.
+ *
+ * @param str - output parameter, will contain the read record
+ * @param maxBytesToConsume - the line mustn't exceed this value
+ * @return length of the line read
+ * @throws IOException if the first byte cannot be read for any reason
+ * other than the end of the file, if the input stream has been closed,
+ * or if some other I/O error occurs.
+ */
+ public int readLine(Writable str, int maxBytesToConsume) throws IOException {
+ ChunkWritable cw = (ChunkWritable) str;
+ List<Node> list = new LinkedList<Node>();
+
+ boolean newLine = false; // length of terminating newline
+ long bytesConsumed = 0;
+
+ do {
+ int startPosn = bufferPosn; // starting from where we left off the
+ // last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+
+ bufferLength = in.read(buffer);
+ if (bufferLength <= 0) {
+ break; // EOF
+ }
+ }
+
+ for (; bufferPosn < bufferLength; ++bufferPosn) { // search for
+ // newline
+ if (buffer[bufferPosn] == LF) {
+ newLine = true;
+ ++bufferPosn; // at next invocation proceed from following
+ // byte
+ break;
+ }
+ }
+
+ int readLength = bufferPosn - startPosn;
+ bytesConsumed += readLength;
+
+ if (readLength > 0) {
+ Node nd = new Node();
+ nd.slice = new byte[readLength];
+ nd.len = readLength;
+ System.arraycopy(buffer, startPosn, nd.slice, 0, nd.len);
+ list.add(nd);
+ }
+ } while (!newLine && bytesConsumed < maxBytesToConsume);
+
+ if (list.size() > 0) {
+ cw.box = new byte[(int) bytesConsumed];
+ int pos = 0;
+ for (int i = 0; i < list.size(); i++) {
+ Node n = list.get(i);
+ System.arraycopy(n.slice, 0, cw.box, pos, n.len);
+ pos += n.len;
+ }
+ }
+
+ return (int) bytesConsumed;
+ }
}