You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/24 07:54:29 UTC

carbondata git commit: [CARBONDATA-3186]Avoid creating empty carbondata file when all the records are bad record with action redirect.

Repository: carbondata
Updated Branches:
  refs/heads/master bd752e9d5 -> 10bc5c2ec


[CARBONDATA-3186]Avoid creating empty carbondata file when all the records are bad record with action redirect.

problem: In the no_sort flow, writer will be open as there is no blocking sort step.
So, when all the record goes as bad record with redirect in converted step.
writer is closing the empty .carbondata file.
when this empty carbondata file is queried , we get multiple issues including NPE.

solution: When the file size is 0 bytes. do the following
a) If one data and one index file -- delete carbondata file and avoid index file creation
b) If multiple data and one index file (with few data file is full of bad recod)
-- delete carbondata files, remove them from blockIndexInfoList, so index file not will not have that info of empty carbon files
c) In case direct write to store path is enable. need to delete data file from there and avoid writing index file with that carbondata in info.

[HOTFIX] Presto NPE when non-transactional table is cached for s3a/HDFS.
cause: for non-transactional table, schema must not be read.

solution: use inferred schema, instead of checking schema file.

This closes #3003


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/10bc5c2e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/10bc5c2e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/10bc5c2e

Branch: refs/heads/master
Commit: 10bc5c2ec69711c12bc379e9f0997d3363543364
Parents: bd752e9
Author: ajantha-bhat <aj...@gmail.com>
Authored: Wed Dec 19 18:27:53 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Mon Dec 24 13:22:41 2018 +0530

----------------------------------------------------------------------
 .../presto/impl/CarbonTableReader.java          |  4 +-
 .../TestNonTransactionalCarbonTable.scala       | 29 ++++++++++++-
 .../store/writer/AbstractFactDataWriter.java    | 45 ++++++++++++++++----
 3 files changed, 65 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/10bc5c2e/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 9677839..363f3f5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -288,8 +288,8 @@ public class CarbonTableReader {
     }
     if (isKeyExists) {
       CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
-      if (carbonTableCacheModel != null
-          && carbonTableCacheModel.carbonTable.getTableInfo() != null) {
+      if (carbonTableCacheModel != null && carbonTableCacheModel.carbonTable.getTableInfo() != null
+          && carbonTableCacheModel.carbonTable.isTransactionalTable()) {
         Long latestTime = FileFactory.getCarbonFile(CarbonTablePath
             .getSchemaFilePath(carbonCache.get().get(schemaTableName).carbonTable.getTablePath()))
             .getLastModifiedTime();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10bc5c2e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index a166789..1c211e3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -34,7 +34,7 @@ import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericR
 import org.apache.avro.io.{DecoderFactory, Encoder}
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
 import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 
@@ -119,6 +119,13 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     buildTestData(rows, options, List("name"))
   }
 
+  def buildTestDataWithOptionsAndEmptySortColumn(rows: Int,
+      options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildTestData(rows, options, List())
+  }
+
+
   // prepare sdk writer output
   def buildTestData(rows: Int,
       options: util.Map[String, String],
@@ -157,7 +164,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         }
         i += 1
       }
-      if (options != null) {
+      if ((options != null) && sortColumns.nonEmpty) {
         //Keep one valid record. else carbon data file will not generate
         writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
       }
@@ -439,6 +446,24 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
+  test("test create external table with all the records as bad record with redirect") {
+    var options = Map("bAd_RECords_action" -> "REDIRECT").asJava
+    buildTestDataWithOptionsAndEmptySortColumn(3, options)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    // when one row is bad record and it it redirected.
+    // Empty carbon files must not create in no_sort flow
+    var exception = intercept[AnalysisException] {
+      sql(
+        s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+           |'carbondata' LOCATION
+           |'$writerPath' """.stripMargin)
+    }
+    assert(exception.getMessage()
+      .contains("Invalid table path provided"))
+    cleanTestData()
+  }
+
   test("test create External Table with Schema with partition, should ignore schema and partition") {
     buildTestDataSingleFile()
     assert(new File(writerPath).exists())

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10bc5c2e/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 31d3b68..8ed0bf7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -178,7 +178,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     this.enableDirectlyWriteDataToStorePath = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
 
     if (enableDirectlyWriteDataToStorePath) {
-      LOGGER.info("Carbondata will directly write fact data to HDFS.");
+      LOGGER.info("Carbondata will directly write fact data to store path.");
     } else {
       LOGGER.info("Carbondata will write temporary fact data to local disk.");
     }
@@ -231,13 +231,13 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
           + activeFile + ". Data block size: " + currentFileSize);
       // write meta data to end of the existing file
       writeFooterToFile();
-      this.currentFileSize = 0;
       this.dataChunksOffsets = new ArrayList<>();
       this.dataChunksLength = new ArrayList<>();
       this.blockletMetadata = new ArrayList<>();
       this.blockletIndex = new ArrayList<>();
       commitCurrentFile(false);
       // initialize the new channel
+      this.currentFileSize = 0;
       initializeWriter();
     }
     currentFileSize += blockletSizeToBeAdded;
@@ -272,18 +272,41 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
     if (!enableDirectlyWriteDataToStorePath) {
       try {
-        if (copyInCurrentThread) {
-          CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
-              model.getCarbonDataDirectoryPath(), fileSizeInBytes);
-          FileFactory
-              .deleteFile(carbonDataFileTempPath, FileFactory.getFileType(carbonDataFileTempPath));
+        if (currentFileSize == 0) {
+          handleEmptyDataFile(carbonDataFileTempPath);
         } else {
-          executorServiceSubmitList
-              .add(executorService.submit(new CompleteHdfsBackendThread(carbonDataFileTempPath)));
+          if (copyInCurrentThread) {
+            CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
+                model.getCarbonDataDirectoryPath(), fileSizeInBytes);
+            FileFactory.deleteFile(carbonDataFileTempPath,
+                FileFactory.getFileType(carbonDataFileTempPath));
+          } else {
+            executorServiceSubmitList
+                .add(executorService.submit(new CompleteHdfsBackendThread(carbonDataFileTempPath)));
+          }
         }
       } catch (IOException e) {
         LOGGER.error(e);
       }
+    } else {
+      if (currentFileSize == 0) {
+        try {
+          handleEmptyDataFile(carbonDataFileStorePath);
+        } catch (IOException e) {
+          LOGGER.error(e);
+        }
+      }
+    }
+  }
+
+  private void handleEmptyDataFile(String filePath) throws IOException {
+    FileFactory.deleteFile(filePath, FileFactory.getFileType(filePath));
+    if (blockIndexInfoList.size() > 0 && blockIndexInfoList.get(blockIndexInfoList.size() - 1)
+        .getFileName().equals(carbonDataFileName)) {
+      // no need add this entry in index file
+      blockIndexInfoList.remove(blockIndexInfoList.size() - 1);
+      // TODO: currently there is no implementation for notifyDataMapBlockEnd(),
+      // hence no impact, once implementation is done. Need to undo it in this case.
     }
   }
 
@@ -374,6 +397,10 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    * @throws CarbonDataWriterException data writing
    */
   protected void writeIndexFile() throws IOException, CarbonDataWriterException {
+    if (blockIndexInfoList.size() == 0) {
+      // no need to write index file, if data file is not there.
+      return;
+    }
     // get the header
     IndexHeader indexHeader = CarbonMetadataUtil
         .getIndexHeader(localCardinality, thriftColumnSchemaList, model.getBucketId(),