You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:33:43 UTC

incubator-asterixdb-hyracks git commit: Improvment on Cursor for Delimited Data

Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master 84790762f -> f6b9f1f65


Improvment on Cursor for Delimited Data

This change allows the parser to parse records in addition to streams.

Change-Id: I84ff40db664633c633277e9cc0ffa534cda9f26a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/567
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/f6b9f1f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/f6b9f1f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/f6b9f1f6

Branch: refs/heads/master
Commit: f6b9f1f65390794b51eba39913882a9511016d34
Parents: 8479076
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Sat Jan 2 12:55:12 2016 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Sun Jan 3 09:27:44 2016 -0800

----------------------------------------------------------------------
 .../dataflow/value/ISerializerDeserializer.java |  2 +-
 .../file/FieldCursorForDelimitedDataParser.java | 78 +++++++++++---------
 .../hyracks/hdfs/scheduler/Scheduler.java       | 10 +--
 pom.xml                                         |  3 +
 4 files changed, 52 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6b9f1f6/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
index 7f287cc..a93de4c 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ISerializerDeserializer.java
@@ -43,7 +43,7 @@ public interface ISerializerDeserializer<T> extends Serializable {
      *            - Stream to write data to.
      */
     public void serialize(T instance, DataOutput out) throws HyracksDataException;
-    
+
     /*
      * TODO: Add a new method:
      * T deserialize(DataInput in, T mutable)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6b9f1f6/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index e7f53bc..3d96224 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -25,45 +25,48 @@ import java.util.Arrays;
 public class FieldCursorForDelimitedDataParser {
 
     private enum State {
-        INIT,
-        IN_RECORD,
-        EOR,
-        CR,
-        EOF
+        INIT, //initial state
+        IN_RECORD, //cursor is inside record
+        EOR, //cursor is at end of record
+        CR, //cursor at carriage return
+        EOF //end of stream reached
     }
 
-    // public variables will be used by delimited data parser
-    public char[] buffer;
-    public int fStart;
-    public int fEnd;
-    public int recordCount;
-    public int fieldCount;
-    public int doubleQuoteCount;
-    public boolean isDoubleQuoteIncludedInThisField;
+    public char[] buffer; //buffer to holds the input coming form the underlying input stream
+    public int fStart; //start position for field
+    public int fEnd; //end position for field
+    public int recordCount; //count of records
+    public int fieldCount; //count of fields in current record
+    public int doubleQuoteCount; //count of double quotes
+    public boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes
 
-    private static final int INITIAL_BUFFER_SIZE = 4096;
-    private static final int INCREMENT = 4096;
+    private static final int INITIAL_BUFFER_SIZE = 4096;//initial buffer size
+    private static final int INCREMENT = 4096; //increment size
 
-    private final Reader in;
+    private Reader in; //the underlying buffer
 
-    private int start;
-    private int end;
-    private State state;
+    private int start; //start of valid buffer area
+    private int end; //end of valid buffer area
+    private State state; //state (see states above)
 
-    private int lastQuotePosition;
-    private int lastDoubleQuotePosition;
-    private int lastDelimiterPosition;
-    private int quoteCount;
-    private boolean startedQuote;
+    private int lastQuotePosition; //position of last quote
+    private int lastDoubleQuotePosition; //position of last double quote
+    private int lastDelimiterPosition; //position of last delimiter
+    private int quoteCount; //count of single quotes
+    private boolean startedQuote; //whether a quote has been started
 
-    private char quote;
-    private char fieldDelimiter;
+    private char quote; //the quote character
+    private char fieldDelimiter; //the delimiter
 
     public FieldCursorForDelimitedDataParser(Reader in, char fieldDelimiter, char quote) {
         this.in = in;
-        buffer = new char[INITIAL_BUFFER_SIZE];
+        if (in != null) {
+            buffer = new char[INITIAL_BUFFER_SIZE];
+            end = 0;
+        } else {
+            end = Integer.MAX_VALUE;
+        }
         start = 0;
-        end = 0;
         state = State.INIT;
         this.quote = quote;
         this.fieldDelimiter = fieldDelimiter;
@@ -78,6 +81,15 @@ public class FieldCursorForDelimitedDataParser {
         fieldCount = 0;
     }
 
+    public void nextRecord(char[] buffer, int recordLength) throws IOException {
+        recordCount++;
+        fieldCount = 0;
+        start = 0;
+        end = recordLength;
+        state = State.IN_RECORD;
+        this.buffer = buffer;
+    }
+
     public boolean nextRecord() throws IOException {
         recordCount++;
         fieldCount = 0;
@@ -224,12 +236,8 @@ public class FieldCursorForDelimitedDataParser {
                                 startedQuote = true;
                             } else {
                                 // In this case, we don't have a quote in the beginning of a field.
-                                throw new IOException(
-                                        "At record: "
-                                                + recordCount
-                                                + ", field#: "
-                                                + fieldCount
-                                                + " - a quote enclosing a field needs to be placed in the beginning of that field.");
+                                throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
+                                        + " - a quote enclosing a field needs to be placed in the beginning of that field.");
                             }
                         }
                         // Check double quotes - "". We check [start != p-2]
@@ -362,4 +370,4 @@ public class FieldCursorForDelimitedDataParser {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6b9f1f6/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
index fa6ed72..c28c740 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -34,7 +34,6 @@ import java.util.logging.Logger;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
-
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -48,7 +47,6 @@ import org.apache.hyracks.hdfs.api.INcCollectionBuilder;
  * The scheduler conduct data-local scheduling for data reading on HDFS. This
  * class works for Hadoop old API.
  */
-@SuppressWarnings("deprecation")
 public class Scheduler {
     private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
 
@@ -75,6 +73,7 @@ public class Scheduler {
      * @param ncNameToNcInfos
      * @throws HyracksException
      */
+
     public Scheduler(String ipAddress, int port) throws HyracksException {
         try {
             IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
@@ -127,7 +126,8 @@ public class Scheduler {
      *            the hyracks cluster toplogy
      * @throws HyracksException
      */
-    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology) throws HyracksException {
+    public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
+            throws HyracksException {
         this(ncNameToNcInfos);
         this.ncCollectionBuilder = topology == null ? new IPProximityNcCollectionBuilder()
                 : new RackAwareNcCollectionBuilder(topology);
@@ -274,8 +274,8 @@ public class Scheduler {
      * @throws UnknownHostException
      */
     private void scheduleLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slots, Random random,
-            boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits) throws IOException,
-            UnknownHostException {
+            boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits)
+                    throws IOException, UnknownHostException {
         /** scheduling candidates will be ordered inversely according to their popularity */
         PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f6b9f1f6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fbe936d..90f80a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,6 +172,9 @@
            <exclude>**/target/**</exclude>
            <exclude>**/output/**</exclude>
            <exclude>**/*.iml</exclude>
+           <exclude>**/*.prefs</exclude>
+           <exclude>**/.classpath</exclude>
+           <exclude>**/.project</exclude>
       </excludes>
         </configuration>
       </plugin>