You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/01 14:50:43 UTC

carbondata git commit: [CARBONDATA-2417] SDK writer goes to infinite wait when consumer thread is dead

Repository: carbondata
Updated Branches:
  refs/heads/master 4b98af22d -> 7edef8f4a


[CARBONDATA-2417] SDK writer goes to infinite wait when consumer thread is dead

problem: SDK writer goes to infinite wait when consumer thread is dead

root cause: due to bad record when an exception happens at consumer thread
during write, this message is not reached producer (SDK writer).
So, SDK keeps writing data assuming consumer will consume it. But as a consumer is dead, queue becomes full and queue.put() will be blocked
forever.

Solution: When the consumer is dead, call writer.close() forcefully and
clear queue. so that blocking write will go an and skip writing next
batches. when writer.close() is called by user. Throw an exception that
write is failed

This closes #2251


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7edef8f4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7edef8f4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7edef8f4

Branch: refs/heads/master
Commit: 7edef8f4a780b1754ea4afe6794b4ba2ffa06794
Parents: 4b98af2
Author: ajantha-bhat <aj...@gmail.com>
Authored: Sat Apr 28 18:17:35 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue May 1 20:20:02 2018 +0530

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableOutputFormat.java     |  9 ++++++---
 .../TestNonTransactionalCarbonTable.scala       | 20 ++++++++++++++++----
 .../iterator/CarbonOutputIteratorWrapper.java   | 16 +++++++++++++++-
 3 files changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7edef8f4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 36ba02d..7050c8f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -243,7 +243,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
     final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper();
     final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
-    ExecutorService executorService = Executors.newFixedThreadPool(1,
+    final ExecutorService executorService = Executors.newFixedThreadPool(1,
         new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));;
     // It should be started in new thread as the underlying iterator uses blocking queue.
     Future future = executorService.submit(new Thread() {
@@ -252,9 +252,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
           dataLoadExecutor
               .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper });
         } catch (Exception e) {
+          executorService.shutdownNow();
           dataLoadExecutor.close();
           // clean up the folders and files created locally for data load operation
           TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
+
+          iteratorWrapper.closeWriter(true);
           throw new RuntimeException(e);
         }
       }
@@ -407,7 +410,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     }
 
     @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException {
-      iteratorWrapper.closeWriter();
+      iteratorWrapper.closeWriter(false);
       try {
         future.get();
       } catch (ExecutionException e) {
@@ -419,7 +422,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
         // clean up the folders and files created locally for data load operation
         TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
       }
-      LOG.info("Closed partition writer task " + taskAttemptContext.getTaskAttemptID());
+      LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID());
     }
 
     public CarbonLoadModel getLoadModel() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7edef8f4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 3adcec8..f1bda31 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -43,7 +43,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
                             "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
     .getCanonicalPath
   //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
-  writerPath = writerPath.replace("\\", "/");
+  writerPath = writerPath.replace("\\", "/")
 
   def buildTestDataSingleFile(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -74,7 +74,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
   def buildTestDataWithBadRecordFail(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
     var options = Map("bAd_RECords_action" -> "FAIL").asJava
-    buildTestData(3, false, options)
+    buildTestData(15001, false, options)
   }
 
   def buildTestDataWithBadRecordIgnore(): Any = {
@@ -127,7 +127,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         }
       var i = 0
       while (i < rows) {
-        if (options != null){
+        if ((options != null) && (i < 3)) {
           // writing a bad record
           writer.write(Array[String]( "robot" + i, String.valueOf(i.toDouble / 2), "robot"))
         } else {
@@ -141,7 +141,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       }
       writer.close()
     } catch {
-      case ex: Exception => None
+      case ex: Exception => throw new RuntimeException(ex)
+
       case _ => None
     }
   }
@@ -636,4 +637,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
+
+  test("test huge data write with one batch having bad record") {
+
+    val exception =
+      intercept[RuntimeException] {
+      buildTestDataWithBadRecordFail()
+    }
+    assert(exception.getMessage()
+      .contains("Data load failed due to bad record"))
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7edef8f4/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index 9229598..4067be1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -47,6 +47,10 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> {
   private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
 
   public void write(Object[] row) throws InterruptedException {
+    if (close) {
+      // already might be closed forcefully
+      return;
+    }
     if (!loadBatch.addRow(row)) {
       loadBatch.readyRead();
       queue.put(loadBatch);
@@ -82,8 +86,18 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> {
     return readBatch.next();
   }
 
-  public void closeWriter() {
+  public void closeWriter(boolean isForceClose) {
+    if (close) {
+      // already might be closed forcefully
+      return;
+    }
     try {
+      if (isForceClose) {
+        // unblock the queue.put on the other thread and clear the queue.
+        queue.clear();
+        close = true;
+        return;
+      }
       loadBatch.readyRead();
       if (loadBatch.size > 0) {
         queue.put(loadBatch);