You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:38 UTC
[40/47] incubator-carbondata git commit: [bug]fix bug of duplicate
rows in UnivocityCsvParser (#877)
[bug]fix bug of duplicate rows in UnivocityCsvParser (#877)
* fix bug of duplicate rows in UnivocityCsvParser
* test 10 million
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/87f0deca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/87f0deca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/87f0deca
Branch: refs/heads/master
Commit: 87f0deca35981e890b2d89565009dbde3bce4610
Parents: 2d50d5c
Author: david <qi...@qq.com>
Authored: Fri Jul 29 17:42:56 2016 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Fri Jul 29 15:12:56 2016 +0530
----------------------------------------------------------------------
.../spark/util/GlobalDictionaryUtil.scala | 4 +-
.../dataload/TestLoadDataWithJunkChars.scala | 43 +++++
.../csvreaderstep/CustomDataStream.java | 108 +++++++++++++
.../processing/csvreaderstep/CustomReader.java | 157 -------------------
.../csvreaderstep/UnivocityCsvParser.java | 13 +-
5 files changed, 156 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
index ff7e360..0e15803 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -297,8 +297,8 @@ object GlobalDictionaryUtil extends Logging {
val dictFilePaths = dictDetail.dictFilePaths
val dictFileExists = dictDetail.dictFileExists
val columnIdentifier = dictDetail.columnIdentifiers
- val hdfstemplocation = CarbonProperties.getInstance.
- getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION)
+ val hdfstemplocation = CarbonProperties.getInstance.getProperty(
+ CarbonCommonConstants.HDFS_TEMP_LOCATION, System.getProperty("java.io.tmpdir"))
val lockType = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithJunkChars.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithJunkChars.scala b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithJunkChars.scala
new file mode 100644
index 0000000..5472e7a
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithJunkChars.scala
@@ -0,0 +1,43 @@
+package org.carbondata.integration.spark.testsuite.dataload;
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+
+import org.scalatest.BeforeAndAfterAll
+import java.io.File
+import java.io.BufferedWriter
+import java.io.FileWriter
+import java.util.Random
+import org.apache.spark.sql.Row
+
+class TestLoadDataWithJunkChars extends QueryTest with BeforeAndAfterAll {
+ var filePath = ""
+ val junkchars = "\u01cd\u01ce\u01cf\u01d0\u01d1\u01d2\u01d3\u01d4\u01d5\u01d6\u01d7\u01d8\u01d9\u01da\u01db\u01dc\u01dd\u01de\u01df\u01e0\u01e1\u01e2\u01e3\u01e4\u01e5\u01e6\u01e7\u01e8\u01e9\u01ea\u01eb\u01ec\u01ed\u01ee\u01ef\u01f0"
+
+ def buildTestData() = {
+ val pwd = new File(this.getClass.getResource("/").getPath + "/../../").getCanonicalPath
+ filePath = pwd + "/target/junkcharsdata.csv"
+ val file = new File(filePath)
+ val writer = new BufferedWriter(new FileWriter(file))
+ writer.write("c1,c2\n")
+ val random = new Random
+ for (i <- 1 until 1000000) {
+ writer.write("a" + i + "," + junkchars + "\n")
+ }
+ writer.write("a1000000," + junkchars)
+ writer.close
+ }
+
+ test("[bug]fix bug of duplicate rows in UnivocityCsvParser #877") {
+ buildTestData()
+ sql("drop table if exists junkcharsdata")
+ sql("""create table if not exists junkcharsdata
+ (c1 string, c2 string)
+ STORED BY 'org.apache.carbondata.format'""")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' into table junkcharsdata").show
+ sql("select * from junkcharsdata").show(20,false)
+ checkAnswer(sql("select count(*) from junkcharsdata"), Seq(Row(1000000)))
+ sql("drop table if exists junkcharsdata")
+ new File(filePath).delete()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomDataStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomDataStream.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomDataStream.java
new file mode 100644
index 0000000..9c18eba
--- /dev/null
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomDataStream.java
@@ -0,0 +1,108 @@
+package org.carbondata.processing.csvreaderstep;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Custom reader class to read the data from file it will take care of reading
+ * till the limit assigned to this class
+ */
+public class CustomDataStream extends InputStream {
+
+ /**
+ * byte value of the new line character
+ */
+ private static final byte END_OF_LINE_BYTE_VALUE = '\n';
+
+ /**
+ * number of extra character to read
+ */
+ private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
+
+ /**
+ * number of bytes remaining
+ */
+ private long remaining;
+ /**
+ * to check whether end of line is found
+ */
+ private boolean endOfLineFound = false;
+
+ private DataInputStream in;
+
+ public CustomDataStream(DataInputStream in, long limit) {
+ this.in = in;
+ this.remaining = limit;
+ }
+
+ /**
+ * Below method will be used to read the data from file
+ *
+ * @throws IOException
+ * problem while reading
+ */
+ @Override
+ public int read() throws IOException {
+ if (this.remaining == 0) {
+ return -1;
+ } else {
+ int var1 = this.in.read();
+ if (var1 >= 0) {
+ --this.remaining;
+ }
+
+ return var1;
+ }
+ }
+
+ /**
+ * Below method will be used to read the data from file. If limit reaches in
+ * that case it will read until new line character is reached
+ *
+ * @param buffer
+ * buffer in which data will be read
+ * @param offset
+ * from position to buffer will be filled
+ * @param length
+ * number of character to be read
+ * @throws IOException
+ * problem while reading
+ */
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ if (this.remaining == 0) {
+ return -1;
+ } else {
+ if (this.remaining < length) {
+ length = (int) this.remaining;
+ }
+
+ length = this.in.read(buffer, offset, length);
+ if (length >= 0) {
+ this.remaining -= length;
+ if (this.remaining == 0 && !endOfLineFound) {
+ endOfLineFound = true;
+ this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+ } else if (endOfLineFound) {
+ int end = offset + length;
+ for (int i = offset; i < end; i++) {
+ if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
+ this.remaining = 0;
+ return (i - offset) + 1;
+ }
+ }
+ this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+ }
+ }
+ return length;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomReader.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomReader.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomReader.java
deleted file mode 100644
index 49c5b15..0000000
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CustomReader.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.processing.csvreaderstep;
-
-import java.io.IOException;
-import java.io.Reader;
-
-/**
- * Custom reader class to read the data from file
- * it will take care of reading till the limit assigned to this
- * class
- */
-public final class CustomReader extends Reader {
-
- /**
- * byte value of the new line character
- */
- private static final byte END_OF_LINE_BYTE_VALUE = '\n';
-
- /**
- * number of extra character to read
- */
- private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
-
- /**
- * number of bytes remaining
- */
- private long remaining;
-
- /**
- * reader
- */
- private Reader reader;
-
- /**
- * to check whether end of line is found
- */
- private boolean endOfLineFound = false;
-
- public CustomReader(Reader var1) {
- this.reader = var1;
- }
-
- /**
- * Below method will be used to read the data from file
- *
- * @throws IOException problem while reading
- */
- public int read() throws IOException {
- if (this.remaining == 0) {
- return -1;
- } else {
- int var1 = this.reader.read();
- if (var1 >= 0) {
- --this.remaining;
- }
-
- return var1;
- }
- }
-
- /**
- * Below method will be used to read the data from file.
- * If limit reaches in that case it will read until new line character is reached
- *
- * @param buffer buffer in which data will be read
- * @param offset from position to buffer will be filled
- * @param length number of character to be read
- * @throws IOException problem while reading
- */
- public int read(char[] buffer, int offset, int length) throws IOException {
- if (this.remaining == 0) {
- return -1;
- } else {
- if (this.remaining < length) {
- length = (int) this.remaining;
- }
-
- length = this.reader.read(buffer, offset, length);
- if (length >= 0) {
- this.remaining -= length;
- if (this.remaining == 0 && !endOfLineFound) {
- if (buffer[length - 1] != END_OF_LINE_BYTE_VALUE) {
- endOfLineFound = true;
- this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
- }
- } else if (endOfLineFound) {
- for (int i = 0; i < length; i++) {
- if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
- this.remaining = 0;
- return i + 1;
- }
- }
- this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
- }
- }
- return length;
- }
- }
-
- /**
- * number of bytes to skip
- */
- public long skip(long numberOfCharacterToSkip) throws IOException {
- throw new UnsupportedOperationException("Not supported");
- }
-
- /**
- * to set the stream position
- */
- public void mark(int readAheadLimit) throws IOException {
- throw new UnsupportedOperationException("Not supported");
- }
-
- /**
- * to close the stream
- */
- public void close() throws IOException {
- this.reader.close();
- }
-
- /**
- * Max number of bytes that can be read by the stream
- *
- * @param limit
- */
- public void setLimit(long limit) {
- this.remaining = limit;
- }
-
- /**
- * number of remaining bytes
- *
- * @return number of remaining bytes
- */
- public final long getRemaining() {
- return this.remaining;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87f0deca/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
index 8359576..9594a24 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
@@ -119,12 +119,7 @@ public class UnivocityCsvParser {
// calculate the end offset the block
long endOffset =
this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockLength() + startOffset;
- // if start offset is not 0 then we need to set the offset to the start of a line
- // so if offset is not zero we are setting to -1 so to check if current position itself is
- // start of the block so -1 will ensure whether last character is new line character or not
- if (startOffset != 0) {
- startOffset -= 1;
- }
+
// create a input stream for the block
DataInputStream dataInputStream = FileFactory
.getDataInputStream(this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath(),
@@ -134,10 +129,8 @@ public class UnivocityCsvParser {
LineReader lineReader = new LineReader(dataInputStream, 1);
startOffset += lineReader.readLine(new Text(), 0);
}
- CustomReader reader =
- new CustomReader(new BufferedReader(new InputStreamReader(dataInputStream)));
- reader.setLimit(endOffset - startOffset);
- inputStreamReader = reader;
+ inputStreamReader = new BufferedReader(new InputStreamReader(
+ new CustomDataStream(dataInputStream, endOffset - startOffset)));
}
/**