You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/21 07:30:45 UTC
[1/2] incubator-carbondata git commit: [CARBONDATA-965] dataload fail
message is not correct when there is no good data to load
Repository: incubator-carbondata
Updated Branches:
refs/heads/master f59f5ae81 -> 674b71e46
[CARBONDATA-965] dataload fail message is not correct when there is no good data to load
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9ffe1775
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9ffe1775
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9ffe1775
Branch: refs/heads/master
Commit: 9ffe1775ca8bca430cd5c4b9723d4322e2baefaa
Parents: f59f5ae
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Thu Apr 20 19:09:46 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Apr 21 12:59:48 2017 +0530
----------------------------------------------------------------------
.../src/test/resources/badrecords/dummy2.csv | 2 +
.../carbondata/spark/load/CarbonLoaderUtil.java | 42 ++++++++++++++++++++
.../spark/rdd/CarbonDataRDDFactory.scala | 11 +++++
.../DataLoadFailAllTypeSortTest.scala | 36 ++++++++++++++++-
.../AlterTableValidationTestCase.scala | 2 +-
.../UnsafeSingleThreadFinalSortFilesMerger.java | 5 ++-
6 files changed, 94 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark-common-test/src/test/resources/badrecords/dummy2.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/badrecords/dummy2.csv b/integration/spark-common-test/src/test/resources/badrecords/dummy2.csv
new file mode 100644
index 0000000..a28b362
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/badrecords/dummy2.csv
@@ -0,0 +1,2 @@
+name,dob,weight
+"","",""
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 4806a93..964c536 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -100,6 +100,48 @@ public final class CarbonLoaderUtil {
}
}
+ /**
+ * the method returns true if the segment has carbondata file else returns false.
+ *
+ * @param loadModel
+ * @param currentLoad
+ * @return
+ */
+ public static boolean isValidSegment(CarbonLoadModel loadModel,
+ int currentLoad) {
+ CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
+ .getCarbonTable();
+ CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+ loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+ int fileCount = 0;
+ int partitionCount = carbonTable.getPartitionCount();
+ for (int i = 0; i < partitionCount; i++) {
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "",
+ currentLoad + "");
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+ FileFactory.getFileType(segmentPath));
+ CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+ @Override
+ public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(
+ CarbonTablePath.getCarbonIndexExtension())
+ || file.getName().endsWith(
+ CarbonTablePath.getCarbonDataExtension());
+ }
+
+ });
+ fileCount += files.length;
+ if (files.length > 0) {
+ return true;
+ }
+ }
+ if (fileCount == 0) {
+ return false;
+ }
+ return true;
+ }
public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
final boolean isCompactionFlow) throws IOException {
CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 631b2a7..4656c2e 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -810,6 +810,17 @@ object CarbonDataRDDFactory {
shutdownDictionaryServer(carbonLoadModel, result, false)
throw new Exception(errorMessage)
} else {
+ // if segment is empty then fail the data load
+ if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, currentLoadCount)) {
+ CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
+ LOGGER.info("********clean up done**********")
+ LOGGER.audit(s"Data load is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ " as there is no data to load")
+ LOGGER.warn("Cannot write load metadata file as data load failed")
+ shutdownDictionaryServer(carbonLoadModel, result, false)
+ throw new Exception("No Data to load")
+ }
val metadataDetails = status(0)._2
if (!isAgg) {
val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index 478b4d3..0465aa7 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -40,6 +40,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
sql("drop table IF EXISTS data_bm")
sql("drop table IF EXISTS data_bmf")
sql("drop table IF EXISTS data_tbm")
+ sql("drop table IF EXISTS data_bm_no_good_data")
}
test("dataload with parallel merge with bad_records_action='FAIL'") {
@@ -122,8 +123,6 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
"STORED BY 'org.apache.carbondata.format'")
val testData = s"$resourcesPath/badrecords/dummy.csv"
sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bm""")
-
-
} catch {
case x: Throwable => {
assert(x.getMessage.contains("Data load failed due to bad record"))
@@ -174,6 +173,38 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
}
}
+ test("dataload with LOAD_USE_BATCH_SORT='true' with bad_records_action='REDIRECT'") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ new File("./target/test/badRecords")
+ .getCanonicalPath)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+ sql("create table data_bm_no_good_data(name String, dob long, weight int) " +
+ "STORED BY 'org.apache.carbondata.format'")
+ val testData = s"$resourcesPath/badrecords/dummy2.csv"
+ sql(
+ s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bm_no_good_data options
+ ('IS_EMPTY_DATA_BAD_RECORD'='true','BAD_RECORDS_ACTION'='REDIRECT')""")
+ } catch {
+ case x: Throwable => {
+ assert(x.getMessage.contains("No Data to load"))
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+ }
+ finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+ }
+ }
+
test("dataload with table bucketing with bad_records_action='FAIL'") {
try {
CarbonProperties.getInstance()
@@ -212,6 +243,7 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
sql("drop table IF EXISTS data_bm")
sql("drop table IF EXISTS data_bmf")
sql("drop table IF EXISTS data_tbm")
+ sql("drop table IF EXISTS data_bm_no_good_data")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index 93b57c2..90a88f6 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -67,7 +67,7 @@ class AlterTableValidationTestCase extends QueryTest with BeforeAndAfterAll {
s"""LOAD DATA LOCAL INPATH '$resourcesPath/badrecords/datasample.csv' INTO TABLE
|restructure_bad OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'bad_records_logger_enable'='true',
- |'bad_records_action'='redirect')"""
+ |'bad_records_action'='force')"""
.stripMargin)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9ffe1775/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 10c5191..95a337a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -116,7 +116,10 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
try {
File[] filesToMergeSort = getFilesToMergeSort();
this.fileCounter = rowPages.length + filesToMergeSort.length + merges.size();
-
+ if (fileCounter == 0) {
+ LOGGER.info("No files to merge sort");
+ return;
+ }
LOGGER.info("Number of row pages: " + this.fileCounter);
// create record holder heap
[2/2] incubator-carbondata git commit: [CARBONDATA-965] data load
fail message is not correct when there is no good data to load. This closes
#824
Posted by ra...@apache.org.
[CARBONDATA-965] data load fail message is not correct when there is no good data to load. This closes #824
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/674b71e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/674b71e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/674b71e4
Branch: refs/heads/master
Commit: 674b71e462463ab9698a809a3c0a45f5465f30ef
Parents: f59f5ae 9ffe177
Author: ravipesala <ra...@gmail.com>
Authored: Fri Apr 21 13:00:30 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Apr 21 13:00:30 2017 +0530
----------------------------------------------------------------------
.../src/test/resources/badrecords/dummy2.csv | 2 +
.../carbondata/spark/load/CarbonLoaderUtil.java | 42 ++++++++++++++++++++
.../spark/rdd/CarbonDataRDDFactory.scala | 11 +++++
.../DataLoadFailAllTypeSortTest.scala | 36 ++++++++++++++++-
.../AlterTableValidationTestCase.scala | 2 +-
.../UnsafeSingleThreadFinalSortFilesMerger.java | 5 ++-
6 files changed, 94 insertions(+), 4 deletions(-)
----------------------------------------------------------------------