You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/31 17:58:37 UTC

carbondata git commit: [CARBONDATA-1658] Fixed Thread leak issue in no sort

Repository: carbondata
Updated Branches:
  refs/heads/master 9955bed24 -> b49160935


[CARBONDATA-1658] Fixed Thread leak issue in no sort

Problem: In case of no sort executor service is not shutting down in writer step which is causing thread leak. In case of long run it will throwing OOM error
Solution:: Need to shutdown executor service in all the case success and failure

This closes #1454


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b4916093
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b4916093
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b4916093

Branch: refs/heads/master
Commit: b49160935a7c3c8fe1899e3e1c49ba7022cb0938
Parents: 9955bed
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Oct 30 20:52:19 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Oct 31 23:27:46 2017 +0530

----------------------------------------------------------------------
 .../src/test/resources/measureinsertintotest.csv     |  7 +++++++
 .../testsuite/sortcolumns/TestSortColumns.scala      | 15 +++++++++++++++
 .../steps/CarbonRowDataWriterProcessorStepImpl.java  | 10 +++++++++-
 3 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4916093/integration/spark-common-test/src/test/resources/measureinsertintotest.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/measureinsertintotest.csv b/integration/spark-common-test/src/test/resources/measureinsertintotest.csv
new file mode 100644
index 0000000..06985e8
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/measureinsertintotest.csv
@@ -0,0 +1,7 @@
+id,name,city,age
+1,david,shenzhen,31
+2,eason,shenzhen,27
+3,jarry,wuhan,35
+3,jarry,Bangalore,35
+4,kunal,Delhi,26
+4,vishal,Bangalore,29

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4916093/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
index b655025..b5fd8a9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -16,7 +16,9 @@
  */
 package org.apache.carbondata.spark.testsuite.sortcolumns
 
+import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -29,6 +31,11 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
 
     sql("CREATE TABLE origintable1 (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE origintable1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+
+    sql("CREATE TABLE tableOne(id int, name string, city string, age int) STORED BY 'org.apache.carbondata.format'")
+    sql("CREATE TABLE tableTwo(id int, age int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table tableOne")
+    sql("insert into table tableTwo select id, count(age) from tableOne group by id")
   }
 
   test("create table sort columns dictionary include - int") {
@@ -327,6 +334,12 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
   assert(exceptionCaught.getMessage.equals("SORT_COLUMNS Either having duplicate columns : empno or it contains illegal argumnet."))
   }
 
+  test("Test tableTwo data") {
+    checkAnswer(
+      sql("select id,age from tableTwo order by id"),
+      Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+  }
+
   test("Measure columns in sort_columns") {
     val exceptionCaught = intercept[MalformedCarbonCommandException] {
       sql(
@@ -378,6 +391,8 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists test_sort_col")
     sql("drop table if exists test_sort_col_hive")
     sql("drop table if exists sorttable1b")
+    sql("DROP TABLE IF EXISTS tableOne")
+    sql("DROP TABLE IF EXISTS tableTwo")
   }
 
   def setLoadingProperties(offheap: String, unsafe: String, useBatch: String): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4916093/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 7007160..34b4f3d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
@@ -100,6 +101,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     final Iterator<CarbonRowBatch>[] iterators = child.execute();
     tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier();
     tableName = tableIdentifier.getTableName();
+    ExecutorService executorService = null;
     try {
       readCounter = new long[iterators.length];
       writeCounter = new long[iterators.length];
@@ -122,7 +124,9 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       if (iterators.length == 1) {
         doExecute(iterators[0], 0, 0);
       } else {
-        ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+        executorService = Executors.newFixedThreadPool(iterators.length,
+            new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
+                .getCarbonTableIdentifier().getTableName()));
         Future[] futures = new Future[iterators.length];
         for (int i = 0; i < iterators.length; i++) {
           futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i));
@@ -141,6 +145,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
         throw new BadRecordFoundException(e.getMessage(), e);
       }
       throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
+    } finally {
+      if (null != executorService && executorService.isShutdown()) {
+        executorService.shutdownNow();
+      }
     }
     return null;
   }