You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:16:00 UTC

[12/56] [abbrv] incubator-carbondata git commit: Revert "[Issue - 626] csv file having some rows not in proper format is not treating them to the bad records." (#687)

Revert "[Issue - 626] csv file having some rows not in proper format is not treating them to the bad records." (#687)



Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6c2469db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6c2469db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6c2469db

Branch: refs/heads/master
Commit: 6c2469db288a7df59284c7a13aaed1d0b417edef
Parents: 075dd92
Author: Venkata Ramana G <g....@gmail.com>
Authored: Fri Jun 17 22:58:00 2016 +0530
Committer: GitHub <no...@github.com>
Committed: Fri Jun 17 22:58:00 2016 +0530

----------------------------------------------------------------------
 .../src/test/resources/InvalidCsvFormatdata.csv |   3 -
 .../csvreaderstep/BlockDataHandler.java         | 576 +++++++++----------
 .../processing/csvreaderstep/CsvInput.java      |  10 +-
 .../processing/csvreaderstep/CsvInputMeta.java  |  66 +--
 .../graphgenerator/GraphGenerator.java          |   3 +-
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |  22 +-
 .../util/CarbonDataProcessorUtil.java           |  34 +-
 7 files changed, 318 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/integration/spark/src/test/resources/InvalidCsvFormatdata.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/InvalidCsvFormatdata.csv b/integration/spark/src/test/resources/InvalidCsvFormatdata.csv
deleted file mode 100644
index d61dd50..0000000
--- a/integration/spark/src/test/resources/InvalidCsvFormatdata.csv
+++ /dev/null
@@ -1,3 +0,0 @@
-ID,date,country,name,phonetype,serialname,salary
-1,2015/7/23,china,aaa1,phone197,ASD69643,15000
-2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
index d832504..29b4a54 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/BlockDataHandler.java
@@ -27,8 +27,6 @@ import java.util.List;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
 import org.carbondata.core.load.BlockDetails;
-import org.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordslogger;
-import org.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.commons.vfs.FileObject;
 import org.pentaho.di.core.exception.KettleConversionException;
@@ -54,21 +52,19 @@ public class BlockDataHandler {
   public int bufferSize;
   public long bytesToSkipInFirstFile;
   public long totalBytesRead;
-  public CsvInputMeta meta;
-  public CsvInputData data;
+  public CsvInputMeta meta ;
+  public CsvInputData data ;
   public TransMeta transMeta;
   public boolean isNeedToSkipFirstLineInBlock;
   public long currentOffset;
 
   protected InputStream bufferedInputStream;
-  protected BadRecordslogger badRecordslogger;
-  private String badRecordFileName;
+
 
   public BlockDataHandler() {
     byteBuffer = new byte[] {};
     isNeedToSkipFirstLineInBlock = true;
     currentOffset = 0;
-
   }
 
   // Resize
@@ -202,12 +198,7 @@ public class BlockDataHandler {
 
       // Don't skip again in the next file...
       this.bytesToSkipInFirstFile = -1L;
-      String key = meta.getDatabaseName() + '/' + meta.getTableName() + '_' + meta.getTableName();
-      badRecordFileName = transMeta.getVariable("csvInputFilePath");
-      badRecordFileName = null != badRecordFileName ? badRecordFileName : meta.getTableName();
-      badRecordFileName = CarbonDataProcessorUtil.getBagLogFileName(badRecordFileName);
-      badRecordslogger = new BadRecordslogger(key, badRecordFileName, CarbonDataProcessorUtil
-          .getBadLogStoreLocation(meta.getDatabaseName() + '/' + meta.getTableName()));
+
       return true;
     } catch (KettleException e) {
       throw e;
@@ -215,12 +206,11 @@ public class BlockDataHandler {
       throw new KettleException(e);
     }
   }
-
   protected void initializeFileReader(FileObject fileObject) throws IOException {
     //using file object to get path can return a valid path which for new inputstream
     String filePath = KettleVFS.getFilename(fileObject);
-    this.bufferedInputStream = FileFactory
-        .getDataInputStream(filePath, FileFactory.getFileType(filePath), data.preferredBufferSize);
+    this.bufferedInputStream = FileFactory.getDataInputStream(filePath,
+        FileFactory.getFileType(filePath), data.preferredBufferSize);
     //when open a new file, need to initialize all info
     this.byteBuffer = new byte[data.preferredBufferSize];
     this.bufferSize = 0;
@@ -228,16 +218,14 @@ public class BlockDataHandler {
     this.endBuffer = 0;
     this.currentOffset = 0;
   }
-
   /**
-   * skip the offset and reset the value
-   *
+   *  skip the offset and reset the value
    * @param filePath
    * @param offset
    * @throws IOException
    */
-  protected void initializeFileReader(String filePath, long offset)
-      throws IOException, KettleFileException {
+  protected void initializeFileReader(String filePath,long offset) throws IOException,
+          KettleFileException {
     if (this.bufferedInputStream != null) {
       this.bufferedInputStream.close();
     }
@@ -248,9 +236,8 @@ public class BlockDataHandler {
       filePath = KettleVFS.getFilename(fileObject);
     }
 
-    this.bufferedInputStream = FileFactory
-        .getDataInputStream(filePath, FileFactory.getFileType(filePath), data.preferredBufferSize,
-            offset);
+    this.bufferedInputStream = FileFactory.getDataInputStream(filePath,
+        FileFactory.getFileType(filePath), data.preferredBufferSize,offset);
     this.byteBuffer = new byte[data.preferredBufferSize];
     this.bufferSize = 0;
     this.startBuffer = 0;
@@ -268,334 +255,337 @@ public class BlockDataHandler {
   public Object[] readOneRow(boolean doConversions) throws KettleException {
 
     try {
-      while (true) {
-        Object[] outputRowData =
-            RowDataUtil.allocateRowData(data.outputRowMeta.size() - RowDataUtil.OVER_ALLOCATE_SIZE);
-        int outputIndex = 0;
-        boolean newLineFound = false;
-        boolean endOfBuffer = false;
-        int newLines = 0;
-        List<Exception> conversionExceptions = null;
-        List<ValueMetaInterface> exceptionFields = null;
-
-        // The strategy is as follows...
-        // We read a block of byte[] from the file.
-        // We scan for the separators in the file (NOT for line feeds etc)
-        // Then we scan that block of data.
-        // We keep a byte[] that we extend if needed..
-        // At the end of the block we read another, etc.
-        //
-        // Let's start by looking where we left off reading.
-        //
-        while (!newLineFound && outputIndex < meta.getInputFields().length) {
-
-          if (checkBufferSize() && outputRowData != null) {
-            // Last row was being discarded if the last item is null and
-            // there is no end of line delimiter
-            //if (outputRowData != null) {
-            // Make certain that at least one record exists before
-            // filling the rest of them with null
-            if (outputIndex > 0) {
-              return (outputRowData);
-            }
 
-            return null; // nothing more to read, call it a day.
+      Object[] outputRowData = RowDataUtil
+          .allocateRowData(data.outputRowMeta.size() - RowDataUtil.OVER_ALLOCATE_SIZE);
+      int outputIndex = 0;
+      boolean newLineFound = false;
+      boolean endOfBuffer = false;
+      int newLines = 0;
+      List<Exception> conversionExceptions = null;
+      List<ValueMetaInterface> exceptionFields = null;
+
+      // The strategy is as follows...
+      // We read a block of byte[] from the file.
+      // We scan for the separators in the file (NOT for line feeds etc)
+      // Then we scan that block of data.
+      // We keep a byte[] that we extend if needed..
+      // At the end of the block we read another, etc.
+      //
+      // Let's start by looking where we left off reading.
+      //
+      while (!newLineFound && outputIndex < meta.getInputFields().length) {
+
+        if (checkBufferSize() && outputRowData != null) {
+          // Last row was being discarded if the last item is null and
+          // there is no end of line delimiter
+          //if (outputRowData != null) {
+          // Make certain that at least one record exists before
+          // filling the rest of them with null
+          if (outputIndex > 0) {
+            return (outputRowData);
           }
 
-          // OK, at this point we should have data in the byteBuffer and we should be able
-          // to scan for the next
-          // delimiter (;)
-          // So let's look for a delimiter.
-          // Also skip over the enclosures ("), it is NOT taking into account
-          // escaped enclosures.
-          // Later we can add an option for having escaped or double enclosures
-          // in the file. <sigh>
+          return null; // nothing more to read, call it a day.
+        }
+
+        // OK, at this point we should have data in the byteBuffer and we should be able
+        // to scan for the next
+        // delimiter (;)
+        // So let's look for a delimiter.
+        // Also skip over the enclosures ("), it is NOT taking into account
+        // escaped enclosures.
+        // Later we can add an option for having escaped or double enclosures
+        // in the file. <sigh>
+        //
+        boolean delimiterFound = false;
+        boolean enclosureFound = false;
+        int escapedEnclosureFound = 0;
+        while (!delimiterFound) {
+          // If we find the first char, we might find others as well ;-)
+          // Single byte delimiters only for now.
           //
-          boolean delimiterFound = false;
-          boolean enclosureFound = false;
-          int escapedEnclosureFound = 0;
-          while (!delimiterFound) {
-            // If we find the first char, we might find others as well ;-)
-            // Single byte delimiters only for now.
-            //
-            if (data.delimiterMatcher
-                .matchesPattern(this.byteBuffer, this.endBuffer, data.delimiter)) {
-              delimiterFound = true;
+          if (data.delimiterMatcher
+              .matchesPattern(this.byteBuffer, this.endBuffer, data.delimiter)) {
+            delimiterFound = true;
+          }
+          // Perhaps we found a (pre-mature) new line?
+          //
+          else if (
+              // In case we are not using an enclosure and in case fields contain new
+              // lines we need to make sure that we check the newlines possible flag.
+              // If the flag is enable we skip newline checking except for the last field
+              // in the row. In that one we can't support newlines without
+              // enclosure (handled below).
+              //
+              (!meta.isNewlinePossibleInFields()
+                  || outputIndex == meta.getInputFields().length - 1) && (
+                  data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+                      || data.crLfMatcher
+                      .isLineFeed(this.byteBuffer, this.endBuffer))) {
+
+            if (data.encodingType.equals(EncodingType.DOUBLE_LITTLE_ENDIAN)
+                || data.encodingType.equals(EncodingType.DOUBLE_BIG_ENDIAN)) {
+              this.endBuffer += 2;
+              this.currentOffset +=2;
+            } else {
+              this.endBuffer++;
+              this.currentOffset++;
             }
-            // Perhaps we found a (pre-mature) new line?
-            //
-            else if (
-                // In case we are not using an enclosure and in case fields contain new
-                // lines we need to make sure that we check the newlines possible flag.
-                // If the flag is enable we skip newline checking except for the last field
-                // in the row. In that one we can't support newlines without
-                // enclosure (handled below).
-                (!meta.isNewlinePossibleInFields()
-                    || outputIndex == meta.getInputFields().length - 1) && (
-                    data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) || data.crLfMatcher
-                        .isLineFeed(this.byteBuffer, this.endBuffer))) {
-
-              if (data.encodingType.equals(EncodingType.DOUBLE_LITTLE_ENDIAN) || data.encodingType
-                  .equals(EncodingType.DOUBLE_BIG_ENDIAN)) {
-                this.endBuffer += 2;
-                this.currentOffset += 2;
-              } else {
-                this.endBuffer++;
-                this.currentOffset++;
-              }
 
-              this.totalBytesRead++;
-              newLines = 1;
+            this.totalBytesRead++;
+            newLines = 1;
+
+            if (this.endBuffer >= this.bufferSize) {
+              // Oops, we need to read more data...
+              // Better resize this before we read other things in it...
+              //
+              this.resizeByteBufferArray();
+
+              // Also read another chunk of data, now that we have the space for it...
+              // Ignore EOF, there might be other stuff in the buffer.
+              //
+              this.readBufferFromFile();
+            }
 
+            // re-check for double delimiters...
+            if (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+                || data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer)) {
+              this.endBuffer++;
+              this.currentOffset++;
+              this.totalBytesRead++;
+              newLines = 2;
               if (this.endBuffer >= this.bufferSize) {
                 // Oops, we need to read more data...
                 // Better resize this before we read other things in it...
                 //
                 this.resizeByteBufferArray();
 
-                // Also read another chunk of data, now that we have the space for it...
+                // Also read another chunk of data, now that we have the space for
+                // it...
                 // Ignore EOF, there might be other stuff in the buffer.
                 //
                 this.readBufferFromFile();
               }
+            }
 
-              // re-check for double delimiters...
-              if (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) || data.crLfMatcher
-                  .isLineFeed(this.byteBuffer, this.endBuffer)) {
-                this.endBuffer++;
-                this.currentOffset++;
-                this.totalBytesRead++;
-                newLines = 2;
-                if (this.endBuffer >= this.bufferSize) {
-                  // Oops, we need to read more data...
-                  // Better resize this before we read other things in it...
-                  //
-                  this.resizeByteBufferArray();
-
-                  // Also read another chunk of data, now that we have the space for
-                  // it...
-                  // Ignore EOF, there might be other stuff in the buffer.
-                  //
-                  this.readBufferFromFile();
-                }
+            newLineFound = true;
+            delimiterFound = true;
+          }
+          // Perhaps we need to skip over an enclosed part?
+          // We always expect exactly one enclosure character
+          // If we find the enclosure doubled, we consider it escaped.
+          // --> "" is converted to " later on.
+          //
+          else if (data.enclosure != null && data.enclosureMatcher
+              .matchesPattern(this.byteBuffer, this.endBuffer, data.enclosure)) {
+
+            enclosureFound = true;
+            boolean keepGoing;
+            do {
+              if (this.increaseEndBuffer()) {
+                enclosureFound = false;
+                break;
               }
 
-              newLineFound = true;
-              delimiterFound = true;
-            }
-            // Perhaps we need to skip over an enclosed part?
-            // We always expect exactly one enclosure character
-            // If we find the enclosure doubled, we consider it escaped.
-            // --> "" is converted to " later on.
-            //
-            else if (data.enclosure != null && data.enclosureMatcher
-                .matchesPattern(this.byteBuffer, this.endBuffer, data.enclosure)) {
+              if (!doConversions) {
+                //when catch the block which need to skip first line
+                //the complete row like: abc,"cdf","efg",hij
+                //but this row is split to different blocks
+                //in this block,the remaining row like :  fg",hij
+                //so if we meet the enclosure in the skip line, when we meet \r or \n ,let's break
+                if (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+                        || data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer)) {
+                  enclosureFound = false;
+                  break;
+                }
+              }
 
-              enclosureFound = true;
-              boolean keepGoing;
-              do {
+              keepGoing = !data.enclosureMatcher
+                  .matchesPattern(this.byteBuffer, this.endBuffer,
+                      data.enclosure);
+              if (!keepGoing) {
+                // We found an enclosure character.
+                // Read another byte...
                 if (this.increaseEndBuffer()) {
                   enclosureFound = false;
                   break;
                 }
 
-                if (!doConversions) {
-                  //when catch the block which need to skip first line
-                  //the complete row like: abc,"cdf","efg",hij
-                  //but this row is split to different blocks
-                  //in this block,the remaining row like :  fg",hij
-                  //so if we meet the enclosure in the skip line, when we meet \r or \n ,let's break
-                  if (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) || data.crLfMatcher
-                      .isLineFeed(this.byteBuffer, this.endBuffer)) {
-                    enclosureFound = false;
-                    break;
-                  }
+                // If this character is also an enclosure, we can consider the
+                // enclosure "escaped".
+                // As such, if this is an enclosure, we keep going...
+                //
+                keepGoing = data.enclosureMatcher
+                    .matchesPattern(this.byteBuffer, this.endBuffer,
+                        data.enclosure);
+                if (keepGoing) {
+                  escapedEnclosureFound++;
+                } else {
+                  /**
+                   * <pre>
+                   * fix for customer issue.
+                   * after last enclosure there must be either field end or row
+                   * end otherwise enclosure is field content.
+                   * Example:
+                   * EMPNAME, COMPANY
+                   * 'emp'aa','comab'
+                   * 'empbb','com'cd'
+                   * Here enclosure after emp(emp') and after com(com')
+                   * are not the last enclosures
+                   * </pre>
+                   */
+                  keepGoing = !(data.delimiterMatcher
+                      .matchesPattern(this.byteBuffer, this.endBuffer,
+                          data.delimiter) || data.crLfMatcher
+                      .isReturn(this.byteBuffer, this.endBuffer)
+                      || data.crLfMatcher
+                      .isLineFeed(this.byteBuffer, this.endBuffer));
                 }
 
-                keepGoing = !data.enclosureMatcher
-                    .matchesPattern(this.byteBuffer, this.endBuffer, data.enclosure);
-                if (!keepGoing) {
-                  // We found an enclosure character.
-                  // Read another byte...
-                  if (this.increaseEndBuffer()) {
-                    enclosureFound = false;
-                    break;
-                  }
-
-                  // If this character is also an enclosure, we can consider the
-                  // enclosure "escaped".
-                  // As such, if this is an enclosure, we keep going...
-                  //
-                  keepGoing = data.enclosureMatcher
-                      .matchesPattern(this.byteBuffer, this.endBuffer, data.enclosure);
-                  if (keepGoing) {
-                    escapedEnclosureFound++;
-                  } else {
-                    /**
-                     * <pre>
-                     * fix for customer issue.
-                     * after last enclosure there must be either field end or row
-                     * end otherwise enclosure is field content.
-                     * Example:
-                     * EMPNAME, COMPANY
-                     * 'emp'aa','comab'
-                     * 'empbb','com'cd'
-                     * Here enclosure after emp(emp') and after com(com')
-                     * are not the last enclosures
-                     * </pre>
-                     */
-                    keepGoing = !(data.delimiterMatcher
-                        .matchesPattern(this.byteBuffer, this.endBuffer, data.delimiter)
-                        || data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
-                        || data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer));
-                  }
+              }
+            } while (keepGoing);
 
-                }
-              } while (keepGoing);
+            // Did we reach the end of the buffer?
+            //
+            if (this.endBuffer >= this.bufferSize) {
+              newLineFound = true; // consider it a newline to break out of the upper
+              // while loop
+              newLines += 2; // to remove the enclosures in case of missing
+              // newline on last line.
+              endOfBuffer = true;
+              break;
+            }
+          } else {
 
-              // Did we reach the end of the buffer?
-              //
+            this.endBuffer++;
+            this.currentOffset++;
+            this.totalBytesRead++;
+
+            if (checkBufferSize()) {
               if (this.endBuffer >= this.bufferSize) {
-                newLineFound = true; // consider it a newline to break out of the upper
-                // while loop
-                newLines += 2; // to remove the enclosures in case of missing
-                // newline on last line.
-                endOfBuffer = true;
+                newLineFound = true;
                 break;
               }
-            } else {
-
-              this.endBuffer++;
-              this.currentOffset++;
-              this.totalBytesRead++;
-
-              if (checkBufferSize()) {
-                if (this.endBuffer >= this.bufferSize) {
-                  newLineFound = true;
-                  break;
-                }
-              }
             }
           }
+        }
 
-          // If we're still here, we found a delimiter..
-          // Since the starting point never changed really, we just can grab range:
-          //
-          //    [startBuffer-endBuffer[
-          //
-          // This is the part we want.
-          // data.byteBuffer[data.startBuffer]
-          //
-          int length = calculateFieldLength(newLineFound, newLines, enclosureFound, endOfBuffer);
+        // If we're still here, we found a delimiter..
+        // Since the starting point never changed really, we just can grab range:
+        //
+        //    [startBuffer-endBuffer[
+        //
+        // This is the part we want.
+        // data.byteBuffer[data.startBuffer]
+        //
+        int length =
+            calculateFieldLength(newLineFound, newLines, enclosureFound, endOfBuffer);
 
-          byte[] field = new byte[length];
-          System.arraycopy(this.byteBuffer, this.startBuffer, field, 0, length);
+        byte[] field = new byte[length];
+        System.arraycopy(this.byteBuffer, this.startBuffer, field, 0, length);
 
-          // Did we have any escaped characters in there?
-          //
-          if (escapedEnclosureFound > 0) {
-            field = this.removeEscapedEnclosures(field, escapedEnclosureFound);
-          }
+        // Did we have any escaped characters in there?
+        //
+        if (escapedEnclosureFound > 0) {
+          field = this.removeEscapedEnclosures(field, escapedEnclosureFound);
+        }
 
-          if (doConversions) {
-            if (meta.isLazyConversionActive()) {
-              outputRowData[outputIndex++] = field;
-            } else {
-              // We're not lazy so we convert the data right here and now.
-              // The convert object uses binary storage as such we just have to ask
-              // the native type from it.
-              // That will do the actual conversion.
+        if (doConversions) {
+          if (meta.isLazyConversionActive()) {
+            outputRowData[outputIndex++] = field;
+          } else {
+            // We're not lazy so we convert the data right here and now.
+            // The convert object uses binary storage as such we just have to ask
+            // the native type from it.
+            // That will do the actual conversion.
+            //
+            ValueMetaInterface sourceValueMeta =
+                data.convertRowMeta.getValueMeta(outputIndex);
+            try {
+              // when found a blank line, outputRowData will be filled as
+              // Object array = ["@NU#LL$!BLANKLINE", null, null, ... ]
+              if (field.length == 0 && newLineFound && outputIndex == 0) {
+                outputRowData[outputIndex++] = CarbonCommonConstants.BLANK_LINE_FLAG;
+              } else {
+                outputRowData[outputIndex++] =
+                  sourceValueMeta.convertBinaryStringToNativeType(field);
+              }
+            } catch (KettleValueException e) {
+              // There was a conversion error,
               //
-              ValueMetaInterface sourceValueMeta = data.convertRowMeta.getValueMeta(outputIndex);
-              try {
-                // when found a blank line, outputRowData will be filled as
-                // Object array = ["@NU#LL$!BLANKLINE", null, null, ... ]
-                if (field.length == 0 && newLineFound && outputIndex == 0) {
-                  outputRowData[outputIndex++] = CarbonCommonConstants.BLANK_LINE_FLAG;
-                } else {
-                  outputRowData[outputIndex++] =
-                      sourceValueMeta.convertBinaryStringToNativeType(field);
-                }
-              } catch (KettleValueException e) {
-                // There was a conversion error,
-                //
-                outputRowData[outputIndex++] = null;
-
-                if (conversionExceptions == null) {
-                  conversionExceptions =
-                      new ArrayList<Exception>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-                  exceptionFields =
-                      new ArrayList<ValueMetaInterface>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-                }
+              outputRowData[outputIndex++] = null;
 
-                conversionExceptions.add(e);
-                exceptionFields.add(sourceValueMeta);
+              if (conversionExceptions == null) {
+                conversionExceptions = new ArrayList<Exception>(
+                    CarbonCommonConstants.CONSTANT_SIZE_TEN);
+                exceptionFields = new ArrayList<ValueMetaInterface>(
+                    CarbonCommonConstants.CONSTANT_SIZE_TEN);
               }
+
+              conversionExceptions.add(e);
+              exceptionFields.add(sourceValueMeta);
             }
-          } else {
-            outputRowData[outputIndex++] = null; // nothing for the header, no conversions here.
           }
+        } else {
+          outputRowData[outputIndex++] =
+              null; // nothing for the header, no conversions here.
+        }
 
-          // OK, move on to the next field...
-          if (!newLineFound) {
-            this.endBuffer++;
-            this.currentOffset++;
-            this.totalBytesRead++;
+        // OK, move on to the next field...
+        if (!newLineFound) {
+          this.endBuffer++;
+          this.currentOffset++;
+          this.totalBytesRead++;
+        }
+        this.startBuffer = this.endBuffer;
+      }
+
+      // See if we reached the end of the line.
+      // If not, we need to skip the remaining items on the line until the next newline...
+      if (!newLineFound && !checkBufferSize()) {
+        while (!data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+                && !data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer)){
+          this.endBuffer++;
+          this.currentOffset++;
+          this.totalBytesRead++;
+
+          if (checkBufferSize()) {
+            break; // nothing more to read.
           }
-          this.startBuffer = this.endBuffer;
+
+          // HANDLE: if we're using quoting we might be dealing with a very dirty file
+          // with quoted newlines in trailing fields. (imagine that)
+          // In that particular case we want to use the same logic we use above
+          // (refactored a bit) to skip these fields.
+
         }
 
-        // See if we reached the end of the line.
-        // If not, we need to skip the remaining items on the line until the next newline...
-        if (!newLineFound && !checkBufferSize()) {
-          while (!data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) && !data.crLfMatcher
-              .isLineFeed(this.byteBuffer, this.endBuffer)) {
+        if (!checkBufferSize()) {
+          while (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer)
+              || data.crLfMatcher.isLineFeed(this.byteBuffer, this.endBuffer)) {
             this.endBuffer++;
             this.currentOffset++;
             this.totalBytesRead++;
-
             if (checkBufferSize()) {
               break; // nothing more to read.
             }
-
-            // HANDLE: if we're using quoting we might be dealing with a very dirty file
-            // with quoted newlines in trailing fields. (imagine that)
-            // In that particular case we want to use the same logic we use above
-            // (refactored a bit) to skip these fields.
-
           }
+        }
 
-          if (!checkBufferSize()) {
-            while (data.crLfMatcher.isReturn(this.byteBuffer, this.endBuffer) || data.crLfMatcher
-                .isLineFeed(this.byteBuffer, this.endBuffer)) {
-              this.endBuffer++;
-              this.currentOffset++;
-              this.totalBytesRead++;
-              if (checkBufferSize()) {
-                break; // nothing more to read.
-              }
-            }
-          }
+        // Make sure we start at the right position the next time around.
+        this.startBuffer = this.endBuffer;
+      }
 
-          // Make sure we start at the right position the next time around.
-          this.startBuffer = this.endBuffer;
-        }
 
-        //            incrementLinesInput();
-        if (conversionExceptions != null && conversionExceptions.size() > 0) {
-          // Forward the first exception
-          throw new KettleConversionException(
-              "There were " + conversionExceptions.size() + " conversion errors on line ",
-              conversionExceptions, exceptionFields, outputRowData);
-        }
-        if (outputIndex > 0 && outputIndex < meta.getInputFields().length) {
-          badRecordslogger.addBadRecordsToBilder(outputRowData, meta.getInputFields().length,
-              "Row record is not in valid csv format.", null);
-          continue;
-        } else {
-          return outputRowData;
-        }
+      //            incrementLinesInput();
+      if (conversionExceptions != null && conversionExceptions.size() > 0) {
+        // Forward the first exception
+        throw new KettleConversionException("There were " + conversionExceptions.size()
+            + " conversion errors on line ", conversionExceptions, exceptionFields, outputRowData);
       }
+
+      return outputRowData;
     } catch (KettleConversionException e) {
       throw e;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
index 525902d..76d5716 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -386,7 +386,8 @@ public class CsvInput extends BaseStep implements StepInterface {
           doProcess();
           LOGGER.info("*****************Completed csv reading by thread***********");
         } catch (Throwable e) {
-          LOGGER.error(e, "Thread is terminated due to error");
+          LOGGER.error(e,
+              "Thread is terminated due to error");
         }
         return null;
       }
@@ -477,9 +478,6 @@ public class CsvInput extends BaseStep implements StepInterface {
       if (blockDataHandler.bufferedInputStream != null) {
         blockDataHandler.bufferedInputStream.close();
       }
-      if (null != blockDataHandler.badRecordslogger) {
-        blockDataHandler.badRecordslogger.closeStreams();
-      }
     } catch (RuntimeException e) {
       throw e;
     } catch (Exception e) {
@@ -588,4 +586,4 @@ public class CsvInput extends BaseStep implements StepInterface {
     return false;
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
index b448f02..6f895b1 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
@@ -49,7 +49,13 @@ import org.pentaho.di.resource.ResourceNamingInterface;
 import org.pentaho.di.resource.ResourceReference;
 import org.pentaho.di.trans.Trans;
 import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.*;
+import org.pentaho.di.trans.step.BaseStepMeta;
+import org.pentaho.di.trans.step.StepDataInterface;
+import org.pentaho.di.trans.step.StepInjectionMetaEntry;
+import org.pentaho.di.trans.step.StepInterface;
+import org.pentaho.di.trans.step.StepMeta;
+import org.pentaho.di.trans.step.StepMetaInjectionInterface;
+import org.pentaho.di.trans.step.StepMetaInterface;
 import org.pentaho.di.trans.steps.textfileinput.InputFileMetaInterface;
 import org.pentaho.di.trans.steps.textfileinput.TextFileInputField;
 import org.pentaho.di.trans.steps.textfileinput.TextFileInputMeta;
@@ -90,14 +96,6 @@ public class CsvInputMeta extends BaseStepMeta
   private int currentRestructNumber;
 
   private String blocksID;
-  /**
-   * database name
-   */
-  private String databaseName;
-  /**
-   * tableName
-   */
-  private String tableName;
 
   public CsvInputMeta() {
     super(); // allocate BaseStepMeta
@@ -118,16 +116,10 @@ public class CsvInputMeta extends BaseStepMeta
     bufferSize = "50000";
     currentRestructNumber = -1;
     blocksID = "";
-    databaseName = "";
-    tableName = "";
-
-
   }
 
   private void readData(Node stepnode) throws KettleXMLException {
     try {
-      databaseName = XMLHandler.getTagValue(stepnode, "databaseName");
-      tableName = XMLHandler.getTagValue(stepnode, "tableName");
       filename = XMLHandler.getTagValue(stepnode, getXmlCode("FILENAME"));
       filenameField = XMLHandler.getTagValue(stepnode, getXmlCode("FILENAME_FIELD"));
       rowNumField = XMLHandler.getTagValue(stepnode, getXmlCode("ROW_NUM_FIELD"));
@@ -197,8 +189,7 @@ public class CsvInputMeta extends BaseStepMeta
 
   public String getXML() {
     StringBuffer retval = new StringBuffer(500);
-    retval.append("    ").append(XMLHandler.addTagValue("databaseName", databaseName));
-    retval.append("    ").append(XMLHandler.addTagValue("tableName", tableName));
+
     retval.append("    ").append(XMLHandler.addTagValue(getXmlCode("FILENAME"), filename));
     retval.append("    ")
         .append(XMLHandler.addTagValue(getXmlCode("FILENAME_FIELD"), filenameField));
@@ -257,8 +248,6 @@ public class CsvInputMeta extends BaseStepMeta
   public void readRep(Repository rep, ObjectId idStep, List<DatabaseMeta> databases,
       Map<String, Counter> counters) throws KettleException {
     try {
-      databaseName = rep.getStepAttributeString(idStep, getRepCode("databaseName"));
-      tableName = rep.getStepAttributeString(idStep, getRepCode("tableName"));
       filename = rep.getStepAttributeString(idStep, getRepCode("FILENAME"));
       filenameField = rep.getStepAttributeString(idStep, getRepCode("FILENAME_FIELD"));
       rowNumField = rep.getStepAttributeString(idStep, getRepCode("ROW_NUM_FIELD"));
@@ -309,8 +298,6 @@ public class CsvInputMeta extends BaseStepMeta
   public void saveRep(Repository rep, ObjectId idTransformation, ObjectId idStep)
       throws KettleException {
     try {
-      rep.saveStepAttribute(idTransformation, idStep, getRepCode("databaseName"), databaseName);
-      rep.saveStepAttribute(idTransformation, idStep, getRepCode("databaseName"), tableName);
       rep.saveStepAttribute(idTransformation, idStep, getRepCode("FILENAME"), filename);
       rep.saveStepAttribute(idTransformation, idStep, getRepCode("FILENAME_FIELD"), filenameField);
       rep.saveStepAttribute(idTransformation, idStep, getRepCode("ROW_NUM_FIELD"), rowNumField);
@@ -794,11 +781,7 @@ public class CsvInputMeta extends BaseStepMeta
       //
       String attributeKey = attr.getKey();
       if (entry.getValueType() != ValueMetaInterface.TYPE_NONE) {
-        if ("databaseName".equals(attributeKey)) {
-          databaseName = (String) entry.getValue();
-        } else if ("tableName".equals(attributeKey)) {
-          tableName = (String) entry.getValue();
-        } else if ("FILENAME".equals(attributeKey)) {
+        if ("FILENAME".equals(attributeKey)) {
           filename = (String) entry.getValue();
         } else if ("FILENAME_FIELD".equals(attributeKey)) {
           filenameField = (String) entry.getValue();
@@ -922,35 +905,4 @@ public class CsvInputMeta extends BaseStepMeta
     this.currentRestructNumber = currentRestructNum;
   }
 
-  /**
-   * retuns database name
-   * @return
-   */
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  /**
-   * return tableName
-   * @return
-   */
-  public String getTableName() {
-    return tableName;
-  }
-
-  /**
-   * set databasename
-   * @param databaseName
-   */
-  public void setDatabaseName(String databaseName) {
-    this.databaseName = databaseName;
-  }
-
-  /**
-   * set tabke name
-   * @param tableName
-   */
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
index 682b3d1..00f233c 100644
--- a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
+++ b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -440,8 +440,7 @@ public class GraphGenerator {
     csvInputMeta.setBlocksID(this.blocksID);
     csvDataStep.setDraw(true);
     csvDataStep.setDescription("Read raw data from " + GraphGeneratorConstants.CSV_INPUT);
-    csvInputMeta.setDatabaseName(schemaInfo.getSchemaName());
-    csvInputMeta.setTableName(schemaInfo.getCubeName());
+
     return csvDataStep;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index 6ac5722..e40bd25 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -27,8 +27,15 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
 import java.sql.Connection;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -362,10 +369,9 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
           columnsInfo.setDimensionColumnIds(meta.getDimensionColumnIds());
           columnsInfo.setColumnSchemaDetailsWrapper(meta.getColumnSchemaDetailsWrapper());
           updateBagLogFileName();
-          csvFilepath = CarbonDataProcessorUtil.getBagLogFileName(csvFilepath);
           String key = meta.getSchemaName() + '/' + meta.getCubeName() + '_' + meta.getTableName();
-          badRecordslogger = new BadRecordslogger(key, csvFilepath, CarbonDataProcessorUtil
-              .getBadLogStoreLocation(meta.getSchemaName() + '/' + meta.getCubeName()));
+          badRecordslogger = new BadRecordslogger(key, csvFilepath,
+              getBadLogStoreLocation(meta.getSchemaName() + '/' + meta.getCubeName()));
 
           columnsInfo.setTimeOrdinalIndices(meta.timeOrdinalIndices);
           surrogateKeyGen = new FileStoreSurrogateKeyGenForCSV(columnsInfo, meta.getPartitionID(),
@@ -694,6 +700,14 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
             meta.getPartitionID(), meta.getSegmentId()+"");
   }
 
+  private String getBadLogStoreLocation(String storeLocation) {
+    String badLogStoreLocation =
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+    return badLogStoreLocation;
+  }
+
   private void updateBagLogFileName() {
     csvFilepath = new File(csvFilepath).getName();
     if (csvFilepath.indexOf(".") > -1) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c2469db/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
index a64a256..8f2c9e7 100644
--- a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -79,6 +79,7 @@ public final class CarbonDataProcessorUtil {
     return fileBufferSize;
   }
 
+
   /**
    * Utility method to get level cardinality string
    *
@@ -153,6 +154,7 @@ public final class CarbonDataProcessorUtil {
     }// CHECKSTYLE:ON
   }
 
+
   public static void checkResult(List<CheckResultInterface> remarks, StepMeta stepMeta,
       String[] input) {
     CheckResult cr;
@@ -274,34 +276,4 @@ public final class CarbonDataProcessorUtil {
     String localDataLoadFolderLocation = carbonDataDirectoryPath + File.separator + taskId;
     return localDataLoadFolderLocation;
   }
-
-  /**
-   * The method returns the bad record store location
-   *
-   * @param storeLocation
-   * @return
-   */
-  public static String getBadLogStoreLocation(String storeLocation) {
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
-    return badLogStoreLocation;
-  }
-
-  /**
-   * method returns the bad log file name
-   *
-   * @param csvFilepath
-   * @return
-   */
-  public static String getBagLogFileName(String csvFilepath) {
-    csvFilepath = new File(csvFilepath).getName();
-    if (csvFilepath.indexOf(".") > -1) {
-      csvFilepath = csvFilepath.substring(0, csvFilepath.indexOf("."));
-    }
-
-    return csvFilepath + '_' + System.currentTimeMillis() + ".log";
-
-  }
-}
+}
\ No newline at end of file