You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by zh...@apache.org on 2019/06/20 09:07:53 UTC

[carbondata] branch master updated: [CARBONDATA-3422] fix missing complex dimensions when prepare the data from raw object

This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fb9234  [CARBONDATA-3422] fix missing complex dimensions when prepare the data from raw object
2fb9234 is described below

commit 2fb923450f7dbafb0a1ebe321f569bcb320f6426
Author: lamber-ken <22...@qq.com>
AuthorDate: Mon Jun 10 15:34:51 2019 +0800

    [CARBONDATA-3422] fix missing complex dimensions when prepare the data from raw object
    
    fix missing complex dimensions when prepare the data from raw object
    
    This closes #3270
---
 .../TestStreamingTableWithRowParser.scala          | 25 +++++++++++++++++++---
 .../merger/CompactionResultSortProcessor.java      | 12 ++++++-----
 2 files changed, 29 insertions(+), 8 deletions(-)

diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 5d806a3..71d94b7 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.spark.carbondatafalse
+package org.apache.spark.carbondata
 
-import java.io.{File, PrintWriter}
+import java.io.PrintWriter
 import java.math.BigDecimal
 import java.net.{BindException, ServerSocket}
 import java.sql.{Date, Timestamp}
@@ -29,7 +29,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
@@ -679,6 +679,25 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
         Row("city_3", 2, 100000006, 21, 30000.0, 0.3)))
   }
 
+  test("alter on stream table with dictionary, sort_columns and complex column") {
+    executeStreamingIngest(
+      tableName = "stream_table_filter_complex",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force",
+      autoHandoff = false
+    )
+
+    sql("SHOW SEGMENTS FOR TABLE streaming1.stream_table_filter_complex").show
+    sql("ALTER TABLE streaming1.stream_table_filter_complex COMPACT 'close_streaming'")
+    sql("SHOW SEGMENTS FOR TABLE streaming1.stream_table_filter_complex").show
+
+  }
+
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 7b3381c..0c5c7d0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -130,6 +130,10 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    */
   private int dimensionColumnCount;
   /**
+   * all allDimensions in the table
+   */
+  private List<CarbonDimension> dimensions;
+  /**
    * whether the allocated tasks has any record
    */
   private boolean isRecordFound;
@@ -268,7 +272,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * @return
    */
   private Object[] prepareStreamingRowObjectForSorting(Object[] row) {
-    List<CarbonDimension> dimensions = segmentProperties.getDimensions();
     Object[] preparedRow = new Object[dimensions.size() + measureCount];
     for (int i = 0; i < dimensions.size(); i++) {
       CarbonDimension dims = dimensions.get(i);
@@ -307,9 +310,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   private Object[] prepareRowObjectForSorting(Object[] row) {
     ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
     // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount];
-    List<CarbonDimension> dimensions = new ArrayList<>();
-    dimensions.addAll(segmentProperties.getDimensions());
-    dimensions.addAll(segmentProperties.getComplexDimensions());
     Object[] preparedRow = new Object[dimensions.size() + measureCount];
     // convert the dictionary from MDKey to surrogate key
     byte[] dictionaryKey = wrapper.getDictionaryKey();
@@ -431,7 +431,9 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    */
   private void initSortDataRows() throws Exception {
     measureCount = carbonTable.getMeasureByTableName(tableName).size();
-    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+    dimensions = new ArrayList<>(2);
+    dimensions.addAll(segmentProperties.getDimensions());
+    dimensions.addAll(segmentProperties.getComplexDimensions());
     noDictionaryColMapping = new boolean[dimensions.size()];
     sortColumnMapping = new boolean[dimensions.size()];
     isVarcharDimMapping = new boolean[dimensions.size()];