You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/13 10:39:05 UTC
[1/2] incubator-carbondata git commit: [CARBONDATA-903] data load is
not failing even though bad records exists in the data in case of unsafe sort
or batch sort
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 9efcacdac -> f4fc65199
[CARBONDATA-903] data load is not failing even though bad records exists in the data in case of unsafe sort or batch sort
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/53accb35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/53accb35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/53accb35
Branch: refs/heads/master
Commit: 53accb35685fa959b5262a46518b6e9b0480439f
Parents: 9efcacd
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Tue Apr 11 18:26:51 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 13 16:07:58 2017 +0530
----------------------------------------------------------------------
.../DataLoadFailAllTypeSortTest.scala | 218 +++++++++++++++++++
.../newflow/sort/AbstractMergeSorter.java | 43 ++++
.../sort/impl/ParallelReadMergeSorterImpl.java | 18 +-
...arallelReadMergeSorterWithBucketingImpl.java | 16 +-
.../UnsafeBatchParallelReadMergeSorterImpl.java | 43 +++-
.../impl/UnsafeParallelReadMergeSorterImpl.java | 19 +-
.../UnsafeSingleThreadFinalSortFilesMerger.java | 10 +
7 files changed, 333 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
new file mode 100644
index 0000000..478b4d3
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
+ var hiveContext: HiveContext = _
+
+ override def beforeAll: Unit = {
+ sql("drop table IF EXISTS data_pm")
+ sql("drop table IF EXISTS data_um")
+ sql("drop table IF EXISTS data_bm")
+ sql("drop table IF EXISTS data_bmf")
+ sql("drop table IF EXISTS data_tbm")
+ }
+
+ test("dataload with parallel merge with bad_records_action='FAIL'") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ new File("./target/test/badRecords")
+ .getCanonicalPath)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+ sql("create table data_pm(name String, dob long, weight int) " +
+ "STORED BY 'org.apache.carbondata.format'")
+ val testData = s"$resourcesPath/badrecords/dummy.csv"
+ sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_pm""")
+
+
+ } catch {
+ case x: Throwable => {
+ assert(x.getMessage.contains("Data load failed due to bad record"))
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+ }
+ finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+ }
+ }
+
+ test("dataload with ENABLE_UNSAFE_SORT='true' with bad_records_action='FAIL'") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ new File("./target/test/badRecords")
+ .getCanonicalPath)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true");
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+ sql("create table data_um(name String, dob long, weight int) " +
+ "STORED BY 'org.apache.carbondata.format'")
+ val testData = s"$resourcesPath/badrecords/dummy.csv"
+ sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_um""")
+
+
+ } catch {
+ case x: Throwable => {
+ assert(x.getMessage.contains("Data load failed due to bad record"))
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+ }
+ finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false");
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+ }
+ }
+
+ test("dataload with LOAD_USE_BATCH_SORT='true' with bad_records_action='FAIL'") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ new File("./target/test/badRecords")
+ .getCanonicalPath)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+ sql("create table data_bm(name String, dob long, weight int) " +
+ "STORED BY 'org.apache.carbondata.format'")
+ val testData = s"$resourcesPath/badrecords/dummy.csv"
+ sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bm""")
+
+
+ } catch {
+ case x: Throwable => {
+ assert(x.getMessage.contains("Data load failed due to bad record"))
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+ }
+ finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+ }
+ }
+
+ test("dataload with LOAD_USE_BATCH_SORT='true' with bad_records_action='FORCE'") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ new File("./target/test/badRecords")
+ .getCanonicalPath)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE");
+ sql("create table data_bmf(name String, dob long, weight int) " +
+ "STORED BY 'org.apache.carbondata.format'")
+ val testData = s"$resourcesPath/badrecords/dummy.csv"
+ sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bmf""")
+
+
+ } catch {
+ case x: Throwable => {
+ assert(false)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+ }
+ finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+ }
+ }
+
+ test("dataload with table bucketing with bad_records_action='FAIL'") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ new File("./target/test/badRecords")
+ .getCanonicalPath)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+ sql("create table data_tbm(name String, dob long, weight int) " +
+ "USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='4', " +
+ "'bucketcolumns'='name', 'tableName'='data_tbm')")
+ val testData = s"$resourcesPath/badrecords/dummy.csv"
+ sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_tbm""")
+
+
+ } catch {
+ case x: Throwable => {
+ assert(x.getMessage.contains("Data load failed due to bad record"))
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+ }
+ finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+ }
+ }
+
+ //
+ override def afterAll {
+ sql("drop table IF EXISTS data_pm")
+ sql("drop table IF EXISTS data_um")
+ sql("drop table IF EXISTS data_bm")
+ sql("drop table IF EXISTS data_bmf")
+ sql("drop table IF EXISTS data_tbm")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
new file mode 100644
index 0000000..5179baa
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort;
+
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.sort.impl.ThreadStatusObserver;
+
+/**
+ * The class defines the common methods used in across various type of sort
+ */
+public abstract class AbstractMergeSorter implements Sorter {
+ /**
+ * instance of thread status observer
+ */
+ protected ThreadStatusObserver threadStatusObserver;
+
+ /**
+ * Below method will be used to check error in exception
+ */
+ public void checkError() {
+ if (threadStatusObserver.getThrowable() != null) {
+ if (threadStatusObserver.getThrowable() instanceof CarbonDataLoadingException) {
+ throw (CarbonDataLoadingException) threadStatusObserver.getThrowable();
+ } else {
+ throw new CarbonDataLoadingException(threadStatusObserver.getThrowable());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index ad96578..856b6ac 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
@@ -47,7 +47,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
* First it sorts the data and write to temp files. These temp files will be merge sorted to get
* final merge sort result.
*/
-public class ParallelReadMergeSorterImpl implements Sorter {
+public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
@@ -58,8 +58,6 @@ public class ParallelReadMergeSorterImpl implements Sorter {
private ExecutorService executorService;
- private ThreadStatusObserver threadStatusObserver;
-
private SingleThreadFinalSortFilesMerger finalMerger;
private AtomicLong rowCounter;
@@ -154,18 +152,6 @@ public class ParallelReadMergeSorterImpl implements Sorter {
}
/**
- * Below method will be used to check error in exception
- */
- private void checkError() {
- if (threadStatusObserver.getThrowable() != null) {
- if (threadStatusObserver.getThrowable() instanceof CarbonDataLoadingException) {
- throw (CarbonDataLoadingException) threadStatusObserver.getThrowable();
- } else {
- throw new CarbonDataLoadingException(threadStatusObserver.getThrowable());
- }
- }
- }
- /**
* Below method will be used to process data to next step
*/
private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index e3049d2..e5af1c6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
@@ -50,7 +50,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
* This step is specifically for bucketing, it sorts each bucket data separately and write to
* temp files.
*/
-public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
+public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
@@ -100,17 +100,21 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
throw new CarbonDataLoadingException(e);
}
this.executorService = Executors.newFixedThreadPool(iterators.length);
+ this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
final int batchSize = CarbonProperties.getInstance().getBatchSize();
try {
for (int i = 0; i < iterators.length; i++) {
- executorService.submit(new SortIteratorThread(iterators[i], sortDataRows, rowCounter));
+ executorService.submit(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
+ this.threadStatusObserver));
}
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.DAYS);
processRowToNextStep(sortDataRows, sortParameters);
} catch (Exception e) {
+ checkError();
throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
}
+ checkError();
try {
intermediateFileMerger.finish();
} catch (CarbonDataWriterException e) {
@@ -197,11 +201,14 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
private AtomicLong rowCounter;
+ private ThreadStatusObserver threadStatusObserver;
+
public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows,
- AtomicLong rowCounter) {
+ AtomicLong rowCounter, ThreadStatusObserver observer) {
this.iterator = iterator;
this.sortDataRows = sortDataRows;
this.rowCounter = rowCounter;
+ this.threadStatusObserver = observer;
}
@Override public Void call() throws CarbonDataLoadingException {
@@ -222,6 +229,7 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
}
} catch (Exception e) {
LOGGER.error(e);
+ this.threadStatusObserver.notifyFailed(e);
throw new CarbonDataLoadingException(e);
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index f3a60fc..a54410c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -36,7 +36,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep
import org.apache.carbondata.processing.newflow.row.CarbonRow;
import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
import org.apache.carbondata.processing.newflow.row.CarbonSortBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
@@ -49,7 +49,7 @@ import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterE
* It parallely reads data from array of iterates and do merge sort.
* It sorts data in batches and send to the next step.
*/
-public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
+public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(UnsafeBatchParallelReadMergeSorterImpl.class.getName());
@@ -72,18 +72,22 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
@Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
throws CarbonDataLoadingException {
this.executorService = Executors.newFixedThreadPool(iterators.length);
+ this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
int batchSize = CarbonProperties.getInstance().getBatchSize();
- final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length);
+ final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length,
+ this.threadStatusObserver);
try {
for (int i = 0; i < iterators.length; i++) {
- executorService
- .submit(new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter));
+ executorService.submit(
+ new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter,
+ this.threadStatusObserver));
}
} catch (Exception e) {
+ checkError();
throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
}
-
+ checkError();
// Creates the iterator to read from merge sorter.
Iterator<CarbonSortBatch> batchIterator = new CarbonIterator<CarbonSortBatch>() {
@@ -120,12 +124,15 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
private AtomicLong rowCounter;
+ private ThreadStatusObserver threadStatusObserver;
+
public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortBatchHolder sortDataRows,
- int batchSize, AtomicLong rowCounter) {
+ int batchSize, AtomicLong rowCounter, ThreadStatusObserver threadStatusObserver) {
this.iterator = iterator;
this.sortDataRows = sortDataRows;
this.buffer = new Object[batchSize][];
this.rowCounter = rowCounter;
+ this.threadStatusObserver = threadStatusObserver;
}
@Override public Void call() throws CarbonDataLoadingException {
@@ -152,6 +159,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
}
} catch (Exception e) {
LOGGER.error(e);
+ this.threadStatusObserver.notifyFailed(e);
throw new CarbonDataLoadingException(e);
} finally {
sortDataRows.finishThread();
@@ -176,10 +184,14 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
private AtomicInteger iteratorCount;
- public SortBatchHolder(SortParameters sortParameters, int numberOfThreads) {
+ private ThreadStatusObserver threadStatusObserver;
+
+ public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
+ ThreadStatusObserver threadStatusObserver) {
this.sortParameters = sortParameters;
this.iteratorCount = new AtomicInteger(numberOfThreads);
this.mergerQueue = new LinkedBlockingQueue<>();
+ this.threadStatusObserver = threadStatusObserver;
createSortDataRows();
}
@@ -197,7 +209,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
@Override public UnsafeSingleThreadFinalSortFilesMerger next() {
try {
- return mergerQueue.take();
+ UnsafeSingleThreadFinalSortFilesMerger unsafeSingleThreadFinalSortFilesMerger =
+ mergerQueue.take();
+ if (unsafeSingleThreadFinalSortFilesMerger.isStopProcess()) {
+ throw new RuntimeException(threadStatusObserver.getThrowable());
+ }
+ return unsafeSingleThreadFinalSortFilesMerger;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -209,6 +226,14 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
public void finish() {
try {
+ // if the mergerQue is empty and some CarbonDataLoadingException exception has occurred
+ // then set stop process to true in the finalmerger instance
+ if (mergerQueue.isEmpty() && threadStatusObserver != null
+ && threadStatusObserver.getThrowable() != null && threadStatusObserver
+ .getThrowable() instanceof CarbonDataLoadingException) {
+ finalMerger.setStopProcess(true);
+ mergerQueue.offer(finalMerger);
+ }
processRowToNextStep(sortDataRow, sortParameters);
unsafeIntermediateFileMerger.finish();
List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 18cf314..0caafec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
@@ -49,7 +49,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
* First it sorts the data and write to temp files. These temp files will be merge sorted to get
* final merge sort result.
*/
-public class UnsafeParallelReadMergeSorterImpl implements Sorter {
+public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(UnsafeParallelReadMergeSorterImpl.class.getName());
@@ -92,18 +92,22 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter {
throw new CarbonDataLoadingException(e);
}
this.executorService = Executors.newFixedThreadPool(iterators.length);
+ this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
try {
for (int i = 0; i < iterators.length; i++) {
- executorService
- .submit(new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter));
+ executorService.submit(
+ new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
+ this.threadStatusObserver));
}
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.DAYS);
processRowToNextStep(sortDataRow, sortParameters);
} catch (Exception e) {
+ checkError();
throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
}
+ checkError();
try {
unsafeIntermediateFileMerger.finish();
List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages();
@@ -182,12 +186,16 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter {
private AtomicLong rowCounter;
+ private ThreadStatusObserver threadStatusObserver;
+
public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
- UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter) {
+ UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter,
+ ThreadStatusObserver threadStatusObserver) {
this.iterator = iterator;
this.sortDataRows = sortDataRows;
this.buffer = new Object[batchSize][];
this.rowCounter = rowCounter;
+ this.threadStatusObserver = threadStatusObserver;
}
@Override public Void call() throws CarbonDataLoadingException {
@@ -208,6 +216,7 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter {
}
} catch (Exception e) {
LOGGER.error(e);
+ this.threadStatusObserver.notifyFailed(e);
throw new CarbonDataLoadingException(e);
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index b98a072..10c5191 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -80,6 +80,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
private String tableName;
+ private boolean isStopProcess;
+
public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters) {
this.parameters = parameters;
// set measure and dimension count
@@ -305,4 +307,12 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
recordHolderHeapLocal = null;
}
}
+
+ public boolean isStopProcess() {
+ return isStopProcess;
+ }
+
+ public void setStopProcess(boolean stopProcess) {
+ isStopProcess = stopProcess;
+ }
}
[2/2] incubator-carbondata git commit: [CARBONDATA-903] data load is
not failing even though bad records exists in the data in case of unsafe sort
or batch sort. This closes #783
Posted by ra...@apache.org.
[CARBONDATA-903] data load is not failing even though bad records exists in the data in case of unsafe sort or batch sort. This closes #783
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f4fc6519
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f4fc6519
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f4fc6519
Branch: refs/heads/master
Commit: f4fc651990d5f3865ef93dd6fb3ac49920cff0e5
Parents: 9efcacd 53accb3
Author: ravipesala <ra...@gmail.com>
Authored: Thu Apr 13 16:08:43 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Apr 13 16:08:43 2017 +0530
----------------------------------------------------------------------
.../DataLoadFailAllTypeSortTest.scala | 218 +++++++++++++++++++
.../newflow/sort/AbstractMergeSorter.java | 43 ++++
.../sort/impl/ParallelReadMergeSorterImpl.java | 18 +-
...arallelReadMergeSorterWithBucketingImpl.java | 16 +-
.../UnsafeBatchParallelReadMergeSorterImpl.java | 43 +++-
.../impl/UnsafeParallelReadMergeSorterImpl.java | 19 +-
.../UnsafeSingleThreadFinalSortFilesMerger.java | 10 +
7 files changed, 333 insertions(+), 34 deletions(-)
----------------------------------------------------------------------