You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/18 08:44:59 UTC

[1/3] carbondata git commit: [CARBONDATA-1177]Fixed batch sort synchronization issue

Repository: carbondata
Updated Branches:
  refs/heads/branch-1.1 c05523d0d -> 8ceb069ed


[CARBONDATA-1177]Fixed batch sort synchronization issue


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a6468f73
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a6468f73
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a6468f73

Branch: refs/heads/branch-1.1
Commit: a6468f73bf74a2afb0a4d2c97664e127f91d69bd
Parents: c05523d
Author: dhatchayani <dh...@gmail.com>
Authored: Thu Jun 15 10:03:08 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Jun 18 14:13:37 2017 +0530

----------------------------------------------------------------------
 .../UnsafeBatchParallelReadMergeSorterImpl.java | 36 ++++++++++++++++----
 1 file changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6468f73/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index f1b4a80..c3243b6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.processing.newflow.sort.impl;
 
+import java.io.File;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
@@ -44,6 +46,7 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleT
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 /**
  * It parallely reads data from array of iterates and do merge sort.
@@ -184,11 +187,15 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
     private AtomicInteger iteratorCount;
 
+    private int batchCount;
+
     private ThreadStatusObserver threadStatusObserver;
 
+    private final Object lock = new Object();
+
     public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
         ThreadStatusObserver threadStatusObserver) {
-      this.sortParameters = sortParameters;
+      this.sortParameters = sortParameters.getCopy();
       this.iteratorCount = new AtomicInteger(numberOfThreads);
       this.mergerQueue = new LinkedBlockingQueue<>();
       this.threadStatusObserver = threadStatusObserver;
@@ -197,6 +204,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
     private void createSortDataRows() {
       int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+      setTempLocation(sortParameters);
       this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
           sortParameters.getTempFileLocation());
       unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
@@ -208,6 +216,16 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
       } catch (CarbonSortKeyAndGroupByException e) {
         throw new CarbonDataLoadingException(e);
       }
+      batchCount++;
+    }
+
+    private void setTempLocation(SortParameters parameters) {
+      String carbonDataDirectoryPath = CarbonDataProcessorUtil
+          .getLocalDataFolderLocation(parameters.getDatabaseName(),
+            parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
+            parameters.getSegmentId(), false);
+      parameters.setTempFileLocation(
+          carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     }
 
     @Override public UnsafeSingleThreadFinalSortFilesMerger next() {
@@ -235,7 +253,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
             && threadStatusObserver.getThrowable() != null && threadStatusObserver
             .getThrowable() instanceof CarbonDataLoadingException) {
           finalMerger.setStopProcess(true);
-          mergerQueue.offer(finalMerger);
+          mergerQueue.put(finalMerger);
         }
         processRowToNextStep(sortDataRow, sortParameters);
         unsafeIntermediateFileMerger.finish();
@@ -243,7 +261,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
         finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
             unsafeIntermediateFileMerger.getMergedPages());
         unsafeIntermediateFileMerger.close();
-        mergerQueue.offer(finalMerger);
+        mergerQueue.put(finalMerger);
         sortDataRow = null;
         unsafeIntermediateFileMerger = null;
         finalMerger = null;
@@ -251,16 +269,20 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
         throw new CarbonDataLoadingException(e);
       } catch (CarbonSortKeyAndGroupByException e) {
         throw new CarbonDataLoadingException(e);
+      } catch (InterruptedException e) {
+        throw new CarbonDataLoadingException(e);
       }
     }
 
-    public synchronized void finishThread() {
-      if (iteratorCount.decrementAndGet() <= 0) {
-        finish();
+    public void finishThread() {
+      synchronized (lock) {
+        if (iteratorCount.decrementAndGet() <= 0) {
+          finish();
+        }
       }
     }
 
-    public synchronized boolean hasNext() {
+    public boolean hasNext() {
       return iteratorCount.get() > 0 || !mergerQueue.isEmpty();
     }
 


[2/3] carbondata git commit: Fixed batch load issue count and synchronization

Posted by ra...@apache.org.
Fixed batch load issue count and synchronization


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f7015212
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f7015212
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f7015212

Branch: refs/heads/branch-1.1
Commit: f7015212d10c73c287e28640cb4545158b1ad318
Parents: a6468f7
Author: ravipesala <ra...@gmail.com>
Authored: Thu Jun 15 16:32:09 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Jun 18 14:13:48 2017 +0530

----------------------------------------------------------------------
 .../sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java   | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7015212/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index c3243b6..cc7929d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -197,7 +197,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
         ThreadStatusObserver threadStatusObserver) {
       this.sortParameters = sortParameters.getCopy();
       this.iteratorCount = new AtomicInteger(numberOfThreads);
-      this.mergerQueue = new LinkedBlockingQueue<>();
+      this.mergerQueue = new LinkedBlockingQueue<>(1);
       this.threadStatusObserver = threadStatusObserver;
       createSortDataRows();
     }
@@ -254,6 +254,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
             .getThrowable() instanceof CarbonDataLoadingException) {
           finalMerger.setStopProcess(true);
           mergerQueue.put(finalMerger);
+          return;
         }
         processRowToNextStep(sortDataRow, sortParameters);
         unsafeIntermediateFileMerger.finish();
@@ -270,6 +271,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
       } catch (CarbonSortKeyAndGroupByException e) {
         throw new CarbonDataLoadingException(e);
       } catch (InterruptedException e) {
+        // if fails to put in queue because of interrupted exception, we can offer to free the main
+        // thread from waiting.
+        if (finalMerger != null) {
+          finalMerger.setStopProcess(true);
+          mergerQueue.offer(finalMerger);
+        }
         throw new CarbonDataLoadingException(e);
       }
     }


[3/3] carbondata git commit: Fixed delete with subquery issue

Posted by ra...@apache.org.
Fixed delete with subquery issue


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8ceb069e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8ceb069e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8ceb069e

Branch: refs/heads/branch-1.1
Commit: 8ceb069ed98f97c31dfbdab1be7cde223a6a4a4c
Parents: f701521
Author: ravipesala <ra...@gmail.com>
Authored: Sun Jun 18 00:49:40 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Jun 18 14:14:04 2017 +0530

----------------------------------------------------------------------
 .../iud/DeleteCarbonTableTestCase.scala         | 24 --------
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 39 ++++++------
 .../iud/DeleteCarbonTableSubqueryTestCase.scala | 63 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8ceb069e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 0346067..2e59c9c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -25,8 +25,6 @@ import org.apache.carbondata.core.util.CarbonProperties
 
 class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "false")
     sql("use default")
     sql("drop database  if exists iud_db cascade")
     sql("create database  iud_db")
@@ -97,26 +95,6 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     )
   }
 
-//  test("delete data from  carbon table[where IN (sub query) ]") {
-//    sql("""drop table if exists iud_db.dest""")
-//    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
-//    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
-//    sql("""delete from  iud_db.dest where c1 IN (select c11 from source2)""").show(truncate = false)
-//    checkAnswer(
-//      sql("""select c1 from iud_db.dest"""),
-//      Seq(Row("c"), Row("d"), Row("e"))
-//    )
-//  }
-//  test("delete data from  carbon table[where IN (sub query with where clause) ]") {
-//    sql("""drop table if exists iud_db.dest""")
-//    sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
-//    sql("""LOAD DATA LOCAL INPATH './src/test/resources/IUD/dest.csv' INTO table iud_db.dest""")
-//    sql("""delete from  iud_db.dest where c1 IN (select c11 from source2 where c11 = 'b')""").show()
-//    checkAnswer(
-//      sql("""select c1 from iud_db.dest"""),
-//      Seq(Row("a"), Row("c"), Row("d"), Row("e"))
-//    )
-//  }
   test("delete data from  carbon table[where numeric condition  ]") {
     sql("""drop table if exists iud_db.dest""")
     sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
@@ -128,8 +106,6 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     )
   }
   override def afterAll {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
     sql("use default")
     sql("drop database  if exists iud_db cascade")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8ceb069e/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 7a6c513..ae2e46b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -86,29 +86,34 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
   }
 
   private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = {
-    val output = plan match {
+    val output = plan.transform {
       case proj@Project(cols, Join(
       left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) =>
         var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty
-        val newCols = cols.map { col =>
-          col match {
-            case a@Alias(s: ScalaUDF, name)
-              if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
-                  name.equalsIgnoreCase(
-                    CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
-              projectionToBeAdded :+= a
-              AttributeReference(name, StringType, true)().withExprId(a.exprId)
+        var udfExists = false
+        val newCols = cols.map {
+          case a@Alias(s: ScalaUDF, name)
+            if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+                name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+            udfExists = true
+            projectionToBeAdded :+= a
+            AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId)
+          case other => other
+        }
+        if (udfExists) {
+          val newLeft = left match {
+            case Project(columns, logicalPlan) =>
+              Project(columns ++ projectionToBeAdded, logicalPlan)
+            case filter: Filter =>
+              Project(filter.output ++ projectionToBeAdded, filter)
+            case relation: LogicalRelation =>
+              Project(relation.output ++ projectionToBeAdded, relation)
             case other => other
           }
+          Project(newCols, Join(newLeft, right, jointype, condition))
+        } else {
+          proj
         }
-        val newLeft = left match {
-          case Project(columns, logicalPlan) =>
-            Project(columns ++ projectionToBeAdded, logicalPlan)
-          case filter: Filter =>
-            Project(filter.output ++ projectionToBeAdded, filter)
-          case other => other
-        }
-        Project(newCols, Join(newLeft, right, jointype, condition))
       case other => other
     }
     output

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8ceb069e/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
new file mode 100644
index 0000000..ff6196c
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.carbondata.iud
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class DeleteCarbonTableSubqueryTestCase extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+    sql("use default")
+    sql("drop database  if exists iud_db_sub cascade")
+    sql("create database  iud_db_sub")
+
+    sql("""create table iud_db_sub.source2 (c11 string,c22 int,c33 string,c55 string, c66 int) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source2.csv' INTO table iud_db_sub.source2""")
+    sql("use iud_db_sub")
+  }
+
+  test("delete data from  carbon table[where IN (sub query) ]") {
+    sql("""drop table if exists iud_db_sub.dest""")
+    sql("""create table iud_db_sub.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db_sub.dest""")
+    sql("""delete from  iud_db_sub.dest where c1 IN (select c11 from source2)""").show(truncate = false)
+    checkAnswer(
+      sql("""select c1 from iud_db_sub.dest"""),
+      Seq(Row("c"), Row("d"), Row("e"))
+    )
+  }
+
+  test("delete data from  carbon table[where IN (sub query with where clause) ]") {
+    sql("""drop table if exists iud_db_sub.dest""")
+    sql("""create table iud_db_sub.dest (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""").show()
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud_db_sub.dest""")
+    sql("""delete from  iud_db_sub.dest where c1 IN (select c11 from source2 where c11 = 'b')""").show()
+    checkAnswer(
+      sql("""select c1 from iud_db_sub.dest"""),
+      Seq(Row("a"), Row("c"), Row("d"), Row("e"))
+    )
+  }
+
+  override def afterAll {
+    sql("use default")
+    sql("drop database  if exists iud_db_sub cascade")
+  }
+}
\ No newline at end of file