You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:16:00 UTC
[12/56] [abbrv] incubator-carbondata git commit: Revert "[Issue -
626] csv file having some rows not in proper format is not treating them to
the bad records." (#687)
Revert "[Issue - 626] csv file having some rows not in proper format is not treating them to the bad records." (#687)
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6c2469db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6c2469db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6c2469db
Branch: refs/heads/master
Commit: 6c2469db288a7df59284c7a13aaed1d0b417edef
Parents: 075dd92
Author: Venkata Ramana G <g....@gmail.com>
Authored: Fri Jun 17 22:58:00 2016 +0530
Committer: GitHub <no...@github.com>
Committed: Fri Jun 17 22:58:00 2016 +0530
----------------------------------------------------------------------
.../src/test/resources/InvalidCsvFormatdata.csv | 3 -
.../csvreaderstep/BlockDataHandler.java | 576 +++++++++----------
.../processing/csvreaderstep/CsvInput.java | 10 +-
.../processing/csvreaderstep/CsvInputMeta.java | 66 +--
.../graphgenerator/GraphGenerator.java | 3 +-
.../csvbased/CarbonCSVBasedSeqGenStep.java | 22 +-
.../util/CarbonDataProcessorUtil.java | 34 +-
7 files changed, 318 insertions(+), 396 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/integration/spark/src/test/resources/InvalidCsvFormatdata.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/InvalidCsvFormatdata.csv b/integration/spark/src/test/resources/InvalidCsvFormatdata.csv
deleted file mode 100644
index d61dd50..0000000
--- a/integration/spark/src/test/resources/InvalidCsvFormatdata.csv
+++ /dev/null
@@ -1,3 +0,0 @@
-ID,date,country,name,phonetype,serialname,salary
-1,2015/7/23,china,aaa1,phone197,ASD69643,15000
-2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
index d832504..29b4a54 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
@@ -27,8 +27,6 @@ import java.util.List;
import org.carbondata.core.constants.CarbonCommonConstants;
import org.carbondata.core.datastorage.store.impl.FileFactory;
import org.carbondata.core.load.BlockDetails;
-import org.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordslogger;
-import org.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.commons.vfs.FileObject;
import org.pentaho.di.core.exception.KettleConversionException;
@@ -54,21 +52,19 @@ public class BlockDataHandler {
public int bufferSize;
public long bytesToSkipInFirstFile;
public long totalBytesRead;
- public CsvInputMeta meta;
- public CsvInputData data;
+ public CsvInputMeta meta ;
+ public CsvInputData data ;
public TransMeta transMeta;
public boolean isNeedToSkipFirstLineInBlock;
public long currentOffset;
protected InputStream bufferedInputStream;
- protected BadRecordslogger badRecordslogger;
- private String badRecordFileName;
+
public BlockDataHandler() {
byteBuffer = new byte[] {};
isNeedToSkipFirstLineInBlock = true;
currentOffset = 0;
-
}
// Resize
@@ -202,12 +198,7 @@ public class BlockDataHandler {
// Don't skip again in the next file...
this.bytesToSkipInFirstFile = -1L;
- String key = meta.getDatabaseName() + '/' + meta.getTableName() + '_' + meta.getTableName();
- badRecordFileName = transMeta.getVariable("csvInputFilePath");
- badRecordFileName = null != badRecordFileName ? badRecordFileName : meta.getTableName();
- badRecordFileName = CarbonDataProcessorUtil.getBagLogFileName(badRecordFileName);
- badRecordslogger = new BadRecordslogger(key, badRecordFileName, CarbonDataProcessorUtil
- .getBadLogStoreLocation(meta.getDatabaseName() + '/' + meta.getTableName()));
+
return true;
} catch (KettleException e) {
throw e;
@@ -215,12 +206,11 @@ public class BlockDataHandler {
throw new KettleException(e);
}
}
-
protected void initializeFileReader(FileObject fileObject) throws IOException {
//using file object to get path can return a valid path which for new inputstream
String filePath = KettleVFS.getFilename(fileObject);
- this.bufferedInputStream = FileFactory
- .getDataInputStream(filePath, FileFactory.getFileType(filePath), data.preferredBufferSize);
+ this.bufferedInputStream = FileFactory.getDataInputStream(filePath,
+ FileFactory.getFileType(filePath), data.preferredBufferSize);
//when open a new file, need to initialize all info
this.byteBuffer = new byte[data.preferredBufferSize];
this.bufferSize = 0;
@@ -228,16 +218,14 @@ public class BlockDataHandler {
this.endBuffer = 0;
this.currentOffset = 0;
}
-
/**
- * skip the offset and reset the value
- *
+ * skip the offset and reset the value
* @param filePath
* @param offset
* @throws IOException
*/
- protected void initializeFileReader(String filePath, long offset)
- throws IOException, KettleFileException {
+ protected void initializeFileReader(String filePath,long offset) throws IOException,
+ KettleFileException {
if (this.bufferedInputStream != null) {
this.bufferedInputStream.close();
}
@@ -248,9 +236,8 @@ public class BlockDataHandler {
filePath = KettleVFS.getFilename(fileObject);
}
- this.bufferedInputStream = FileFactory
- .getDataInputStream(filePath, FileFactory.getFileType(filePath), data.preferredBufferSize,
- offset);
+ this.bufferedInputStream = FileFactory.getDataInputStream(filePath,
+ FileFactory.getFileType(filePath), data.preferredBufferSize,offset);
this.byteBuffer = new byte[data.preferredBufferSize];
this.bufferSize = 0;
this.startBuffer = 0;
@@ -268,334 +255,337 @@ public class BlockDataHandler {
public Object[] readOneRow(boolean doConversions) throws KettleException {
try {
- while (true) {
- Object[] outputRowData =
- RowDataUtil.allocateRowData(data.outputRowMeta.size() - RowDataUtil.OVER_ALLOCATE_SIZE);
- int outputIndex = 0;
- boolean newLineFound = false;
- boolean endOfBuffer = false;
- int newLines = 0;
- List<Exception> conversionExceptions = null;
- List<ValueMetaInterface> exceptionFields = null;
-
- // The strategy is as follows...
- // We read a block of byte[] from the file.
- // We scan for the separators in the file (NOT for line feeds etc)
- // Then we scan that block of data.
- // We keep a byte[] that we extend if needed..
- // At the end of the block we read another, etc.
- //
- // Let's start by looking where we left off reading.
- //
- while (!newLineFound && outputIndex < meta.getInputFields().length) {
-
- if (checkBufferSize() && outputRowData != null) {
- // Last row was being discarded if the last item is null and
- // there is no end of line delimiter
- //if (outputRowData != null) {
- // Make certain that at least one record exists before
- // filling the rest of them with null
- if (outputIndex > 0) {
- return (outputRowData);
- }
- return null; // nothing more to read, call it a day.
+ Object[] outputRowData = RowDataUtil
+ .allocateRowData(data.outputRowMeta.size() - RowDataUtil.OVER_ALLOCATE_SIZE);
+ int outputIndex = 0;
+ boolean newLineFound = false;
+ boolean endOfBuffer = false;
+ int newLines = 0;
+ List<Exception> conversionExceptions = null;
+ List<ValueMetaInterface> exceptionFields = null;
+
+ // The strategy is as follows...
+ // We read a block of byte[] from the file.
+ // We scan for the separators in the file (NOT for line feeds etc)
+ // Then we scan that block of data.
+ // We keep a byte[] that we extend if needed..
+ // At the end of the block we read another, etc.
+ //
+ // Let's start by looking where we left off reading.
+ //
+ while (!newLineFound && outputIndex < meta.getInputFields().length) {
+
+ if (checkBufferSize() && outputRowData != null) {
+ // Last row was being discarded if the last item is null and
+ // there is no end of line delimiter
+ //if (outputRowData != null) {
+ // Make certain that at least one record exists before
+ // filling the rest of them with null
+ if (outputIndex > 0) {
+ return (outputRowData);
}
- // OK, at this point we should have data in the byteBuffer and we should be able
- // to scan for the next
- // delimiter (;)
- // So let's look for a delimiter.
- // Also skip over the enclosures ("), it is NOT taking into account
- // escaped enclosures.
- // Later we can add an option for having escaped or double enclosures
- // in the file. <sigh>
+ return null; // nothing more to read, call it a day.
+ }
+
+ // OK, at this point we should have data in the byteBuffer and we should be able
+ // to scan for the next
+ // delimiter (;)
+ // So let's look for a delimiter.
+ // Also skip over the enclosures ("), it is NOT taking into account
+ // escaped enclosures.
+ // Later we can add an option for having escaped or double enclosures
+ // in the file. <sigh>
+ //
+ boolean delimiterFound = false;
+ boolean enclosureFound = false;
+ int escapedEnclosureFound = 0;
+ while (!delimiterFound) {
+ // If we find the first char, we might find others as well ;-)
+ // Single byte delimiters only for now.
//
- boolean delimiterFound = false;
- boolean enclosureFound = false;
- int escapedEnclosureFound = 0;
- while (!delimiterFound) {
- // If we find the first char, we might find others as well ;-)
- // Single byte delimiters only for now.
- //
- if (data.delimiterMatcher
- .matchesPattern(this.byteBuffer, this.endBuffer, data.delimiter)) {
- delimiterFound = true;
+ if (data.delimiterMatcher
+ .matchesPattern(this.byteBuffer, this.endBuffer, data.delimiter)) {
+ delimiterFound = true;
+ }
+ // Perhaps we found a (pre-mature) new line?
+ //
+ else if (
+ // In case we are not using an enclosure and in case fields contain new
+ // lines we need to make sure that we check the newlines possible flag.
+ // If the flag is enable we skip newline checking except for the last field
+ // in the row. In that one we can't support newlines without
+ // enclosure (handled below).
+ //
+ (!meta.isNewlinePossibleInFields()
+ || outputIndex == meta.getInputFields().length - 1) && (
+ data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+ || data.crLfMatcher
+ .isLineFeed(this.byteBuffer, this.endBuffer))) {
+
+ if (data.encodingType.equals(EncodingType.DOUBLE_LITTLE_ENDIAN)
+ || data.encodingType.equals(EncodingType.DOUBLE_BIG_ENDIAN)) {
+ this.endBuffer += 2;
+ this.currentOffset +=2;
+ } else {
+ this.endBuffer++;
+ this.currentOffset++;
}
- // Perhaps we found a (pre-mature) new line?
- //
- else if (
- // In case we are not using an enclosure and in case fields contain new
- // lines we need to make sure that we check the newlines possible flag.
- // If the flag is enable we skip newline checking except for the last field
- // in the row. In that one we can't support newlines without
- // enclosure (handled below).
- (!meta.isNewlinePossibleInFields()
- || outputIndex == meta.getInputFields().length - 1) && (
- data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) || data.crLfMatcher
- .isLineFeed(this.byteBuffer, this.endBuffer))) {
-
- if (data.encodingType.equals(EncodingType.DOUBLE_LITTLE_ENDIAN) || data.encodingType
- .equals(EncodingType.DOUBLE_BIG_ENDIAN)) {
- this.endBuffer += 2;
- this.currentOffset += 2;
- } else {
- this.endBuffer++;
- this.currentOffset++;
- }
- this.totalBytesRead++;
- newLines = 1;
+ this.totalBytesRead++;
+ newLines = 1;
+
+ if (this.endBuffer >= this.bufferSize) {
+ // Oops, we need to read more data...
+ // Better resize this before we read other things in it...
+ //
+ this.resizeByteBufferArray();
+
+ // Also read another chunk of data, now that we have the space for it...
+ // Ignore EOF, there might be other stuff in the buffer.
+ //
+ this.readBufferFromFile();
+ }
+ // re-check for double delimiters...
+ if (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+ || data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer)) {
+ this.endBuffer++;
+ this.currentOffset++;
+ this.totalBytesRead++;
+ newLines = 2;
if (this.endBuffer >= this.bufferSize) {
// Oops, we need to read more data...
// Better resize this before we read other things in it...
//
this.resizeByteBufferArray();
- // Also read another chunk of data, now that we have the space for it...
+ // Also read another chunk of data, now that we have the space for
+ // it...
// Ignore EOF, there might be other stuff in the buffer.
//
this.readBufferFromFile();
}
+ }
- // re-check for double delimiters...
- if (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) || data.crLfMatcher
- .isLineFeed(this.byteBuffer, this.endBuffer)) {
- this.endBuffer++;
- this.currentOffset++;
- this.totalBytesRead++;
- newLines = 2;
- if (this.endBuffer >= this.bufferSize) {
- // Oops, we need to read more data...
- // Better resize this before we read other things in it...
- //
- this.resizeByteBufferArray();
-
- // Also read another chunk of data, now that we have the space for
- // it...
- // Ignore EOF, there might be other stuff in the buffer.
- //
- this.readBufferFromFile();
- }
+ newLineFound = true;
+ delimiterFound = true;
+ }
+ // Perhaps we need to skip over an enclosed part?
+ // We always expect exactly one enclosure character
+ // If we find the enclosure doubled, we consider it escaped.
+ // --> "" is converted to " later on.
+ //
+ else if (data.enclosure != null && data.enclosureMatcher
+ .matchesPattern(this.byteBuffer, this.endBuffer, data.enclosure)) {
+
+ enclosureFound = true;
+ boolean keepGoing;
+ do {
+ if (this.increaseEndBuffer()) {
+ enclosureFound = false;
+ break;
}
- newLineFound = true;
- delimiterFound = true;
- }
- // Perhaps we need to skip over an enclosed part?
- // We always expect exactly one enclosure character
- // If we find the enclosure doubled, we consider it escaped.
- // --> "" is converted to " later on.
- //
- else if (data.enclosure != null && data.enclosureMatcher
- .matchesPattern(this.byteBuffer, this.endBuffer, data.enclosure)) {
+ if (!doConversions) {
+ //when catch the block which need to skip first line
+ //the complete row like: abc,"cdf","efg",hij
+ //but this row is split to different blocks
+ //in this block,the remaining row like : fg",hij
+ //so if we meet the enclosure in the skip line, when we meet \r or \n ,let's break
+ if (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+ || data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer)) {
+ enclosureFound = false;
+ break;
+ }
+ }
- enclosureFound = true;
- boolean keepGoing;
- do {
+ keepGoing = !data.enclosureMatcher
+ .matchesPattern(this.byteBuffer, this.endBuffer,
+ data.enclosure);
+ if (!keepGoing) {
+ // We found an enclosure character.
+ // Read another byte...
if (this.increaseEndBuffer()) {
enclosureFound = false;
break;
}
- if (!doConversions) {
- //when catch the block which need to skip first line
- //the complete row like: abc,"cdf","efg",hij
- //but this row is split to different blocks
- //in this block,the remaining row like : fg",hij
- //so if we meet the enclosure in the skip line, when we meet \r or \n ,let's break
- if (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) || data.crLfMatcher
- .isLineFeed(this.byteBuffer, this.endBuffer)) {
- enclosureFound = false;
- break;
- }
+ // If this character is also an enclosure, we can consider the
+ // enclosure "escaped".
+ // As such, if this is an enclosure, we keep going...
+ //
+ keepGoing = data.enclosureMatcher
+ .matchesPattern(this.byteBuffer, this.endBuffer,
+ data.enclosure);
+ if (keepGoing) {
+ escapedEnclosureFound++;
+ } else {
+ /**
+ * <pre>
+ * fix for customer issue.
+ * after last enclosure there must be either field end or row
+ * end otherwise enclosure is field content.
+ * Example:
+ * EMPNAME, COMPANY
+ * 'emp'aa','comab'
+ * 'empbb','com'cd'
+ * Here enclosure after emp(emp') and after com(com')
+ * are not the last enclosures
+ * </pre>
+ */
+ keepGoing = !(data.delimiterMatcher
+ .matchesPattern(this.byteBuffer, this.endBuffer,
+ data.delimiter) || data.crLfMatcher
+ .isReturn(this.byteBuffer, this.endBuffer)
+ || data.crLfMatcher
+ .isLineFeed(this.byteBuffer, this.endBuffer));
}
- keepGoing = !data.enclosureMatcher
- .matchesPattern(this.byteBuffer, this.endBuffer, data.enclosure);
- if (!keepGoing) {
- // We found an enclosure character.
- // Read another byte...
- if (this.increaseEndBuffer()) {
- enclosureFound = false;
- break;
- }
-
- // If this character is also an enclosure, we can consider the
- // enclosure "escaped".
- // As such, if this is an enclosure, we keep going...
- //
- keepGoing = data.enclosureMatcher
- .matchesPattern(this.byteBuffer, this.endBuffer, data.enclosure);
- if (keepGoing) {
- escapedEnclosureFound++;
- } else {
- /**
- * <pre>
- * fix for customer issue.
- * after last enclosure there must be either field end or row
- * end otherwise enclosure is field content.
- * Example:
- * EMPNAME, COMPANY
- * 'emp'aa','comab'
- * 'empbb','com'cd'
- * Here enclosure after emp(emp') and after com(com')
- * are not the last enclosures
- * </pre>
- */
- keepGoing = !(data.delimiterMatcher
- .matchesPattern(this.byteBuffer, this.endBuffer, data.delimiter)
- || data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
- || data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer));
- }
+ }
+ } while (keepGoing);
- }
- } while (keepGoing);
+ // Did we reach the end of the buffer?
+ //
+ if (this.endBuffer >= this.bufferSize) {
+ newLineFound = true; // consider it a newline to break out of the upper
+ // while loop
+ newLines += 2; // to remove the enclosures in case of missing
+ // newline on last line.
+ endOfBuffer = true;
+ break;
+ }
+ } else {
- // Did we reach the end of the buffer?
- //
+ this.endBuffer++;
+ this.currentOffset++;
+ this.totalBytesRead++;
+
+ if (checkBufferSize()) {
if (this.endBuffer >= this.bufferSize) {
- newLineFound = true; // consider it a newline to break out of the upper
- // while loop
- newLines += 2; // to remove the enclosures in case of missing
- // newline on last line.
- endOfBuffer = true;
+ newLineFound = true;
break;
}
- } else {
-
- this.endBuffer++;
- this.currentOffset++;
- this.totalBytesRead++;
-
- if (checkBufferSize()) {
- if (this.endBuffer >= this.bufferSize) {
- newLineFound = true;
- break;
- }
- }
}
}
+ }
- // If we're still here, we found a delimiter..
- // Since the starting point never changed really, we just can grab range:
- //
- // [startBuffer-endBuffer[
- //
- // This is the part we want.
- // data.byteBuffer[data.startBuffer]
- //
- int length = calculateFieldLength(newLineFound, newLines, enclosureFound, endOfBuffer);
+ // If we're still here, we found a delimiter..
+ // Since the starting point never changed really, we just can grab range:
+ //
+ // [startBuffer-endBuffer[
+ //
+ // This is the part we want.
+ // data.byteBuffer[data.startBuffer]
+ //
+ int length =
+ calculateFieldLength(newLineFound, newLines, enclosureFound, endOfBuffer);
- byte[] field = new byte[length];
- System.arraycopy(this.byteBuffer, this.startBuffer, field, 0, length);
+ byte[] field = new byte[length];
+ System.arraycopy(this.byteBuffer, this.startBuffer, field, 0, length);
- // Did we have any escaped characters in there?
- //
- if (escapedEnclosureFound > 0) {
- field = this.removeEscapedEnclosures(field, escapedEnclosureFound);
- }
+ // Did we have any escaped characters in there?
+ //
+ if (escapedEnclosureFound > 0) {
+ field = this.removeEscapedEnclosures(field, escapedEnclosureFound);
+ }
- if (doConversions) {
- if (meta.isLazyConversionActive()) {
- outputRowData[outputIndex++] = field;
- } else {
- // We're not lazy so we convert the data right here and now.
- // The convert object uses binary storage as such we just have to ask
- // the native type from it.
- // That will do the actual conversion.
+ if (doConversions) {
+ if (meta.isLazyConversionActive()) {
+ outputRowData[outputIndex++] = field;
+ } else {
+ // We're not lazy so we convert the data right here and now.
+ // The convert object uses binary storage as such we just have to ask
+ // the native type from it.
+ // That will do the actual conversion.
+ //
+ ValueMetaInterface sourceValueMeta =
+ data.convertRowMeta.getValueMeta(outputIndex);
+ try {
+ // when found a blank line, outputRowData will be filled as
+ // Object array = ["@NU#LL$!BLANKLINE", null, null, ... ]
+ if (field.length == 0 && newLineFound && outputIndex == 0) {
+ outputRowData[outputIndex++] = CarbonCommonConstants.BLANK_LINE_FLAG;
+ } else {
+ outputRowData[outputIndex++] =
+ sourceValueMeta.convertBinaryStringToNativeType(field);
+ }
+ } catch (KettleValueException e) {
+ // There was a conversion error,
//
- ValueMetaInterface sourceValueMeta = data.convertRowMeta.getValueMeta(outputIndex);
- try {
- // when found a blank line, outputRowData will be filled as
- // Object array = ["@NU#LL$!BLANKLINE", null, null, ... ]
- if (field.length == 0 && newLineFound && outputIndex == 0) {
- outputRowData[outputIndex++] = CarbonCommonConstants.BLANK_LINE_FLAG;
- } else {
- outputRowData[outputIndex++] =
- sourceValueMeta.convertBinaryStringToNativeType(field);
- }
- } catch (KettleValueException e) {
- // There was a conversion error,
- //
- outputRowData[outputIndex++] = null;
-
- if (conversionExceptions == null) {
- conversionExceptions =
- new ArrayList<Exception>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- exceptionFields =
- new ArrayList<ValueMetaInterface>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- }
+ outputRowData[outputIndex++] = null;
- conversionExceptions.add(e);
- exceptionFields.add(sourceValueMeta);
+ if (conversionExceptions == null) {
+ conversionExceptions = new ArrayList<Exception>(
+ CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ exceptionFields = new ArrayList<ValueMetaInterface>(
+ CarbonCommonConstants.CONSTANT_SIZE_TEN);
}
+
+ conversionExceptions.add(e);
+ exceptionFields.add(sourceValueMeta);
}
- } else {
- outputRowData[outputIndex++] = null; // nothing for the header, no conversions here.
}
+ } else {
+ outputRowData[outputIndex++] =
+ null; // nothing for the header, no conversions here.
+ }
- // OK, move on to the next field...
- if (!newLineFound) {
- this.endBuffer++;
- this.currentOffset++;
- this.totalBytesRead++;
+ // OK, move on to the next field...
+ if (!newLineFound) {
+ this.endBuffer++;
+ this.currentOffset++;
+ this.totalBytesRead++;
+ }
+ this.startBuffer = this.endBuffer;
+ }
+
+ // See if we reached the end of the line.
+ // If not, we need to skip the remaining items on the line until the next newline...
+ if (!newLineFound && !checkBufferSize()) {
+ while (!data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+ && !data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer)){
+ this.endBuffer++;
+ this.currentOffset++;
+ this.totalBytesRead++;
+
+ if (checkBufferSize()) {
+ break; // nothing more to read.
}
- this.startBuffer = this.endBuffer;
+
+ // HANDLE: if we're using quoting we might be dealing with a very dirty file
+ // with quoted newlines in trailing fields. (imagine that)
+ // In that particular case we want to use the same logic we use above
+ // (refactored a bit) to skip these fields.
+
}
- // See if we reached the end of the line.
- // If not, we need to skip the remaining items on the line until the next newline...
- if (!newLineFound && !checkBufferSize()) {
- while (!data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) && !data.crLfMatcher
- .isLineFeed(this.byteBuffer, this.endBuffer)) {
+ if (!checkBufferSize()) {
+ while (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+ || data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer)) {
this.endBuffer++;
this.currentOffset++;
this.totalBytesRead++;
-
if (checkBufferSize()) {
break; // nothing more to read.
}
-
- // HANDLE: if we're using quoting we might be dealing with a very dirty file
- // with quoted newlines in trailing fields. (imagine that)
- // In that particular case we want to use the same logic we use above
- // (refactored a bit) to skip these fields.
-
}
+ }
- if (!checkBufferSize()) {
- while (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) || data.crLfMatcher
- .isLineFeed(this.byteBuffer, this.endBuffer)) {
- this.endBuffer++;
- this.currentOffset++;
- this.totalBytesRead++;
- if (checkBufferSize()) {
- break; // nothing more to read.
- }
- }
- }
+ // Make sure we start at the right position the next time around.
+ this.startBuffer = this.endBuffer;
+ }
- // Make sure we start at the right position the next time around.
- this.startBuffer = this.endBuffer;
- }
- // incrementLinesInput();
- if (conversionExceptions != null && conversionExceptions.size() > 0) {
- // Forward the first exception
- throw new KettleConversionException(
- "There were " + conversionExceptions.size() + " conversion errors on line ",
- conversionExceptions, exceptionFields, outputRowData);
- }
- if (outputIndex > 0 && outputIndex < meta.getInputFields().length) {
- badRecordslogger.addBadRecordsToBilder(outputRowData, meta.getInputFields().length,
- "Row record is not in valid csv format.", null);
- continue;
- } else {
- return outputRowData;
- }
+ // incrementLinesInput();
+ if (conversionExceptions != null && conversionExceptions.size() > 0) {
+ // Forward the first exception
+ throw new KettleConversionException("There were " + conversionExceptions.size()
+ + " conversion errors on line ", conversionExceptions, exceptionFields, outputRowData);
}
+
+ return outputRowData;
} catch (KettleConversionException e) {
throw e;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
index 525902d..76d5716 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -386,7 +386,8 @@ public class CsvInput extends BaseStep implements StepInterface {
doProcess();
LOGGER.info("*****************Completed csv reading by thread***********");
} catch (Throwable e) {
- LOGGER.error(e, "Thread is terminated due to error");
+ LOGGER.error(e,
+ "Thread is terminated due to error");
}
return null;
}
@@ -477,9 +478,6 @@ public class CsvInput extends BaseStep implements StepInterface {
if (blockDataHandler.bufferedInputStream != null) {
blockDataHandler.bufferedInputStream.close();
}
- if (null != blockDataHandler.badRecordslogger) {
- blockDataHandler.badRecordslogger.closeStreams();
- }
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
@@ -588,4 +586,4 @@ public class CsvInput extends BaseStep implements StepInterface {
return false;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
index b448f02..6f895b1 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
@@ -49,7 +49,13 @@ import org.pentaho.di.resource.ResourceNamingInterface;
import org.pentaho.di.resource.ResourceReference;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.*;
+import org.pentaho.di.trans.step.BaseStepMeta;
+import org.pentaho.di.trans.step.StepDataInterface;
+import org.pentaho.di.trans.step.StepInjectionMetaEntry;
+import org.pentaho.di.trans.step.StepInterface;
+import org.pentaho.di.trans.step.StepMeta;
+import org.pentaho.di.trans.step.StepMetaInjectionInterface;
+import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.steps.textfileinput.InputFileMetaInterface;
import org.pentaho.di.trans.steps.textfileinput.TextFileInputField;
import org.pentaho.di.trans.steps.textfileinput.TextFileInputMeta;
@@ -90,14 +96,6 @@ public class CsvInputMeta extends BaseStepMeta
private int currentRestructNumber;
private String blocksID;
- /**
- * database name
- */
- private String databaseName;
- /**
- * tableName
- */
- private String tableName;
public CsvInputMeta() {
super(); // allocate BaseStepMeta
@@ -118,16 +116,10 @@ public class CsvInputMeta extends BaseStepMeta
bufferSize = "50000";
currentRestructNumber = -1;
blocksID = "";
- databaseName = "";
- tableName = "";
-
-
}
private void readData(Node stepnode) throws KettleXMLException {
try {
- databaseName = XMLHandler.getTagValue(stepnode, "databaseName");
- tableName = XMLHandler.getTagValue(stepnode, "tableName");
filename = XMLHandler.getTagValue(stepnode, getXmlCode("FILENAME"));
filenameField = XMLHandler.getTagValue(stepnode, getXmlCode("FILENAME_FIELD"));
rowNumField = XMLHandler.getTagValue(stepnode, getXmlCode("ROW_NUM_FIELD"));
@@ -197,8 +189,7 @@ public class CsvInputMeta extends BaseStepMeta
public String getXML() {
StringBuffer retval = new StringBuffer(500);
- retval.append(" ").append(XMLHandler.addTagValue("databaseName", databaseName));
- retval.append(" ").append(XMLHandler.addTagValue("tableName", tableName));
+
retval.append(" ").append(XMLHandler.addTagValue(getXmlCode("FILENAME"), filename));
retval.append(" ")
.append(XMLHandler.addTagValue(getXmlCode("FILENAME_FIELD"), filenameField));
@@ -257,8 +248,6 @@ public class CsvInputMeta extends BaseStepMeta
public void readRep(Repository rep, ObjectId idStep, List<DatabaseMeta> databases,
Map<String, Counter> counters) throws KettleException {
try {
- databaseName = rep.getStepAttributeString(idStep, getRepCode("databaseName"));
- tableName = rep.getStepAttributeString(idStep, getRepCode("tableName"));
filename = rep.getStepAttributeString(idStep, getRepCode("FILENAME"));
filenameField = rep.getStepAttributeString(idStep, getRepCode("FILENAME_FIELD"));
rowNumField = rep.getStepAttributeString(idStep, getRepCode("ROW_NUM_FIELD"));
@@ -309,8 +298,6 @@ public class CsvInputMeta extends BaseStepMeta
public void saveRep(Repository rep, ObjectId idTransformation, ObjectId idStep)
throws KettleException {
try {
- rep.saveStepAttribute(idTransformation, idStep, getRepCode("databaseName"), databaseName);
- rep.saveStepAttribute(idTransformation, idStep, getRepCode("databaseName"), tableName);
rep.saveStepAttribute(idTransformation, idStep, getRepCode("FILENAME"), filename);
rep.saveStepAttribute(idTransformation, idStep, getRepCode("FILENAME_FIELD"), filenameField);
rep.saveStepAttribute(idTransformation, idStep, getRepCode("ROW_NUM_FIELD"), rowNumField);
@@ -794,11 +781,7 @@ public class CsvInputMeta extends BaseStepMeta
//
String attributeKey = attr.getKey();
if (entry.getValueType() != ValueMetaInterface.TYPE_NONE) {
- if ("databaseName".equals(attributeKey)) {
- databaseName = (String) entry.getValue();
- } else if ("tableName".equals(attributeKey)) {
- tableName = (String) entry.getValue();
- } else if ("FILENAME".equals(attributeKey)) {
+ if ("FILENAME".equals(attributeKey)) {
filename = (String) entry.getValue();
} else if ("FILENAME_FIELD".equals(attributeKey)) {
filenameField = (String) entry.getValue();
@@ -922,35 +905,4 @@ public class CsvInputMeta extends BaseStepMeta
this.currentRestructNumber = currentRestructNum;
}
- /**
- * retuns database name
- * @return
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * return tableName
- * @return
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * set databasename
- * @param databaseName
- */
- public void setDatabaseName(String databaseName) {
- this.databaseName = databaseName;
- }
-
- /**
- * set tabke name
- * @param tableName
- */
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
index 682b3d1..00f233c 100644
--- a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
+++ b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -440,8 +440,7 @@ public class GraphGenerator {
csvInputMeta.setBlocksID(this.blocksID);
csvDataStep.setDraw(true);
csvDataStep.setDescription("Read raw data from " + GraphGeneratorConstants.CSV_INPUT);
- csvInputMeta.setDatabaseName(schemaInfo.getSchemaName());
- csvInputMeta.setTableName(schemaInfo.getCubeName());
+
return csvDataStep;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index 6ac5722..e40bd25 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -27,8 +27,15 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.sql.Connection;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -362,10 +369,9 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
columnsInfo.setDimensionColumnIds(meta.getDimensionColumnIds());
columnsInfo.setColumnSchemaDetailsWrapper(meta.getColumnSchemaDetailsWrapper());
updateBagLogFileName();
- csvFilepath = CarbonDataProcessorUtil.getBagLogFileName(csvFilepath);
String key = meta.getSchemaName() + '/' + meta.getCubeName() + '_' + meta.getTableName();
- badRecordslogger = new BadRecordslogger(key, csvFilepath, CarbonDataProcessorUtil
- .getBadLogStoreLocation(meta.getSchemaName() + '/' + meta.getCubeName()));
+ badRecordslogger = new BadRecordslogger(key, csvFilepath,
+ getBadLogStoreLocation(meta.getSchemaName() + '/' + meta.getCubeName()));
columnsInfo.setTimeOrdinalIndices(meta.timeOrdinalIndices);
surrogateKeyGen = new FileStoreSurrogateKeyGenForCSV(columnsInfo, meta.getPartitionID(),
@@ -694,6 +700,14 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
meta.getPartitionID(), meta.getSegmentId()+"");
}
+ private String getBadLogStoreLocation(String storeLocation) {
+ String badLogStoreLocation =
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+ badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+ return badLogStoreLocation;
+ }
+
private void updateBagLogFileName() {
csvFilepath = new File(csvFilepath).getName();
if (csvFilepath.indexOf(".") > -1) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
index a64a256..8f2c9e7 100644
--- a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -79,6 +79,7 @@ public final class CarbonDataProcessorUtil {
return fileBufferSize;
}
+
/**
* Utility method to get level cardinality string
*
@@ -153,6 +154,7 @@ public final class CarbonDataProcessorUtil {
}// CHECKSTYLE:ON
}
+
public static void checkResult(List<CheckResultInterface> remarks, StepMeta stepMeta,
String[] input) {
CheckResult cr;
@@ -274,34 +276,4 @@ public final class CarbonDataProcessorUtil {
String localDataLoadFolderLocation = carbonDataDirectoryPath + File.separator + taskId;
return localDataLoadFolderLocation;
}
-
- /**
- * The method returns the bad record store location
- *
- * @param storeLocation
- * @return
- */
- public static String getBadLogStoreLocation(String storeLocation) {
- String badLogStoreLocation =
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
- badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
- return badLogStoreLocation;
- }
-
- /**
- * method returns the bad log file name
- *
- * @param csvFilepath
- * @return
- */
- public static String getBagLogFileName(String csvFilepath) {
- csvFilepath = new File(csvFilepath).getName();
- if (csvFilepath.indexOf(".") > -1) {
- csvFilepath = csvFilepath.substring(0, csvFilepath.indexOf("."));
- }
-
- return csvFilepath + '_' + System.currentTimeMillis() + ".log";
-
- }
-}
+}
\ No newline at end of file