You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:38 UTC

[40/47] incubator-carbondata git commit: [bug]fix bug of duplicate rows in UnivocityCsvParser (#877)

[bug]fix bug of duplicate rows in UnivocityCsvParser (#877)

* fix bug of duplicate rows in UnivocityCsvParser

* test 10 million


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/87f0deca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/87f0deca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/87f0deca

Branch: refs/heads/master
Commit: 87f0deca35981e890b2d89565009dbde3bce4610
Parents: 2d50d5c
Author: david <qi...@qq.com>
Authored: Fri Jul 29 17:42:56 2016 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Fri Jul 29 15:12:56 2016 +0530

----------------------------------------------------------------------
 .../spark/util/GlobalDictionaryUtil.scala       |   4 +-
 .../dataload/TestLoadDataWithJunkChars.scala    |  43 +++++
 .../csvreaderstep/CustomDataStream.java         | 108 +++++++++++++
 .../processing/csvreaderstep/CustomReader.java  | 157 -------------------
 .../csvreaderstep/UnivocityCsvParser.java       |  13 +-
 5 files changed, 156 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
index ff7e360..0e15803 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -297,8 +297,8 @@ object GlobalDictionaryUtil extends Logging {
     val dictFilePaths = dictDetail.dictFilePaths
     val dictFileExists = dictDetail.dictFileExists
     val columnIdentifier = dictDetail.columnIdentifiers
-    val hdfstemplocation = CarbonProperties.getInstance.
-                      getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION)
+    val hdfstemplocation = CarbonProperties.getInstance.getProperty(
+        CarbonCommonConstants.HDFS_TEMP_LOCATION, System.getProperty("java.io.tmpdir"))
     val lockType = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
     val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithJunkChars.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithJunkChars.scala b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithJunkChars.scala
new file mode 100644
index 0000000..5472e7a
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithJunkChars.scala
@@ -0,0 +1,43 @@
+package org.carbondata.integration.spark.testsuite.dataload;
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+
+import org.scalatest.BeforeAndAfterAll
+import java.io.File
+import java.io.BufferedWriter
+import java.io.FileWriter
+import java.util.Random
+import org.apache.spark.sql.Row
+
+class TestLoadDataWithJunkChars extends QueryTest with BeforeAndAfterAll {
+  var filePath = ""
+  val junkchars = "\u01cd\u01ce\u01cf\u01d0\u01d1\u01d2\u01d3\u01d4\u01d5\u01d6\u01d7\u01d8\u01d9\u01da\u01db\u01dc\u01dd\u01de\u01df\u01e0\u01e1\u01e2\u01e3\u01e4\u01e5\u01e6\u01e7\u01e8\u01e9\u01ea\u01eb\u01ec\u01ed\u01ee\u01ef\u01f0"
+
+  def buildTestData() = {
+    val pwd = new File(this.getClass.getResource("/").getPath + "/../../").getCanonicalPath
+    filePath = pwd + "/target/junkcharsdata.csv"
+    val file = new File(filePath)
+    val writer = new BufferedWriter(new FileWriter(file))
+    writer.write("c1,c2\n")
+    val random = new Random
+    for (i <- 1 until 1000000) {
+      writer.write("a" + i + "," + junkchars + "\n")
+    }
+    writer.write("a1000000," + junkchars)
+    writer.close
+  }
+
+  test("[bug]fix bug of duplicate rows in UnivocityCsvParser #877") {
+    buildTestData()
+    sql("drop table if exists junkcharsdata")
+    sql("""create table if not exists junkcharsdata
+             (c1 string, c2 string)
+             STORED BY 'org.apache.carbondata.format'""")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' into table junkcharsdata").show
+    sql("select * from junkcharsdata").show(20,false)
+    checkAnswer(sql("select count(*) from junkcharsdata"), Seq(Row(1000000)))
+    sql("drop table if exists junkcharsdata")
+    new File(filePath).delete()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomDataStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomDataStream.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomDataStream.java
new file mode 100644
index 0000000..9c18eba
--- /dev/null
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomDataStream.java
@@ -0,0 +1,108 @@
+package org.carbondata.processing.csvreaderstep;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Custom reader class to read the data from file it will take care of reading
+ * till the limit assigned to this class
+ */
+public class CustomDataStream extends InputStream {
+
+  /**
+   * byte value of the new line character
+   */
+  private static final byte END_OF_LINE_BYTE_VALUE = '\n';
+
+  /**
+   * number of extra character to read
+   */
+  private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
+
+  /**
+   * number of bytes remaining
+   */
+  private long remaining;
+  /**
+   * to check whether end of line is found
+   */
+  private boolean endOfLineFound = false;
+
+  private DataInputStream in;
+
+  public CustomDataStream(DataInputStream in, long limit) {
+    this.in = in;
+    this.remaining = limit;
+  }
+
+  /**
+   * Below method will be used to read the data from file
+   *
+   * @throws IOException
+   *           problem while reading
+   */
+  @Override
+  public int read() throws IOException {
+    if (this.remaining == 0) {
+      return -1;
+    } else {
+      int var1 = this.in.read();
+      if (var1 >= 0) {
+        --this.remaining;
+      }
+
+      return var1;
+    }
+  }
+
+  /**
+   * Below method will be used to read the data from file. If limit reaches in
+   * that case it will read until new line character is reached
+   *
+   * @param buffer
+   *          buffer in which data will be read
+   * @param offset
+   *          from position to buffer will be filled
+   * @param length
+   *          number of character to be read
+   * @throws IOException
+   *           problem while reading
+   */
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    if (this.remaining == 0) {
+      return -1;
+    } else {
+      if (this.remaining < length) {
+        length = (int) this.remaining;
+      }
+
+      length = this.in.read(buffer, offset, length);
+      if (length >= 0) {
+        this.remaining -= length;
+        if (this.remaining == 0 && !endOfLineFound) {
+          endOfLineFound = true;
+          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+        } else if (endOfLineFound) {
+          int end = offset + length;
+          for (int i = offset; i < end; i++) {
+            if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
+              this.remaining = 0;
+              return (i - offset) + 1;
+            }
+          }
+          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+        }
+      }
+      return length;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomReader.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomReader.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomReader.java
deleted file mode 100644
index 49c5b15..0000000
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomReader.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.processing.csvreaderstep;
-
-import java.io.IOException;
-import java.io.Reader;
-
-/**
- * Custom reader class to read the data from file
- * it will take care of reading till the limit assigned to  this
- * class
- */
-public final class CustomReader extends Reader {
-
-  /**
-   * byte value of the new line character
-   */
-  private static final byte END_OF_LINE_BYTE_VALUE = '\n';
-
-  /**
-   * number of extra character to read
-   */
-  private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
-
-  /**
-   * number of bytes remaining
-   */
-  private long remaining;
-
-  /**
-   * reader
-   */
-  private Reader reader;
-
-  /**
-   * to check whether end of line is found
-   */
-  private boolean endOfLineFound = false;
-
-  public CustomReader(Reader var1) {
-    this.reader = var1;
-  }
-
-  /**
-   * Below method will  be used to read the data from file
-   *
-   * @throws IOException problem while reading
-   */
-  public int read() throws IOException {
-    if (this.remaining == 0) {
-      return -1;
-    } else {
-      int var1 = this.reader.read();
-      if (var1 >= 0) {
-        --this.remaining;
-      }
-
-      return var1;
-    }
-  }
-
-  /**
-   * Below method will be used to read the data from file.
-   * If limit reaches in that case it will read until new line character is reached
-   *
-   * @param buffer buffer in which data will be read
-   * @param offset from position to buffer will be filled
-   * @param length number of character to be read
-   * @throws IOException problem while reading
-   */
-  public int read(char[] buffer, int offset, int length) throws IOException {
-    if (this.remaining == 0) {
-      return -1;
-    } else {
-      if (this.remaining < length) {
-        length = (int) this.remaining;
-      }
-
-      length = this.reader.read(buffer, offset, length);
-      if (length >= 0) {
-        this.remaining -= length;
-        if (this.remaining == 0 && !endOfLineFound) {
-          if (buffer[length - 1] != END_OF_LINE_BYTE_VALUE) {
-            endOfLineFound = true;
-            this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
-          }
-        } else if (endOfLineFound) {
-          for (int i = 0; i < length; i++) {
-            if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
-              this.remaining = 0;
-              return i + 1;
-            }
-          }
-          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
-        }
-      }
-      return length;
-    }
-  }
-
-  /**
-   * number of bytes to skip
-   */
-  public long skip(long numberOfCharacterToSkip) throws IOException {
-    throw new UnsupportedOperationException("Not supported");
-  }
-
-  /**
-   * to set the stream position
-   */
-  public void mark(int readAheadLimit) throws IOException {
-    throw new UnsupportedOperationException("Not supported");
-  }
-
-  /**
-   * to close the stream
-   */
-  public void close() throws IOException {
-    this.reader.close();
-  }
-
-  /**
-   * Max number of bytes that can be read by the stream
-   *
-   * @param limit
-   */
-  public void setLimit(long limit) {
-    this.remaining = limit;
-  }
-
-  /**
-   * number of remaining bytes
-   *
-   * @return number of remaining bytes
-   */
-  public final long getRemaining() {
-    return this.remaining;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
index 8359576..9594a24 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
@@ -119,12 +119,7 @@ public class UnivocityCsvParser {
     // calculate the end offset the block
     long endOffset =
         this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockLength() + startOffset;
-    // if start offset is not 0 then we need to set the offset to the start of a line
-    // so if offset is not zero we are setting to -1 so to check if current position itself is
-    // start of the block so -1 will ensure whether last character is new line character or not
-    if (startOffset != 0) {
-      startOffset -= 1;
-    }
+
     // create a input stream for the block
     DataInputStream dataInputStream = FileFactory
         .getDataInputStream(this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath(),
@@ -134,10 +129,8 @@ public class UnivocityCsvParser {
       LineReader lineReader = new LineReader(dataInputStream, 1);
       startOffset += lineReader.readLine(new Text(), 0);
     }
-    CustomReader reader =
-        new CustomReader(new BufferedReader(new InputStreamReader(dataInputStream)));
-    reader.setLimit(endOffset - startOffset);
-    inputStreamReader = reader;
+    inputStreamReader = new BufferedReader(new InputStreamReader(
+        new CustomDataStream(dataInputStream, endOffset - startOffset)));
   }
 
   /**