You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by zh...@apache.org on 2019/06/20 09:07:53 UTC
[carbondata] branch master updated: [CARBONDATA-3422] fix missing
complex dimensions when prepare the data from raw object
This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 2fb9234 [CARBONDATA-3422] fix missing complex dimensions when prepare the data from raw object
2fb9234 is described below
commit 2fb923450f7dbafb0a1ebe321f569bcb320f6426
Author: lamber-ken <22...@qq.com>
AuthorDate: Mon Jun 10 15:34:51 2019 +0800
[CARBONDATA-3422] fix missing complex dimensions when prepare the data from raw object
fix missing complex dimensions when prepare the data from raw object
This closes #3270
---
.../TestStreamingTableWithRowParser.scala | 25 +++++++++++++++++++---
.../merger/CompactionResultSortProcessor.java | 12 ++++++-----
2 files changed, 29 insertions(+), 8 deletions(-)
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 5d806a3..71d94b7 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.carbondatafalse
+package org.apache.spark.carbondata
-import java.io.{File, PrintWriter}
+import java.io.PrintWriter
import java.math.BigDecimal
import java.net.{BindException, ServerSocket}
import java.sql.{Date, Timestamp}
@@ -29,7 +29,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
@@ -679,6 +679,25 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
Row("city_3", 2, 100000006, 21, 30000.0, 0.3)))
}
+ test("alter on stream table with dictionary, sort_columns and complex column") {
+ executeStreamingIngest(
+ tableName = "stream_table_filter_complex",
+ batchNums = 2,
+ rowNumsEachBatch = 25,
+ intervalOfSource = 5,
+ intervalOfIngest = 5,
+ continueSeconds = 20,
+ generateBadRecords = true,
+ badRecordAction = "force",
+ autoHandoff = false
+ )
+
+ sql("SHOW SEGMENTS FOR TABLE streaming1.stream_table_filter_complex").show
+ sql("ALTER TABLE streaming1.stream_table_filter_complex COMPACT 'close_streaming'")
+ sql("SHOW SEGMENTS FOR TABLE streaming1.stream_table_filter_complex").show
+
+ }
+
def createWriteSocketThread(
serverSocket: ServerSocket,
writeNums: Int,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 7b3381c..0c5c7d0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -130,6 +130,10 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
*/
private int dimensionColumnCount;
/**
+ * all allDimensions in the table
+ */
+ private List<CarbonDimension> dimensions;
+ /**
* whether the allocated tasks has any record
*/
private boolean isRecordFound;
@@ -268,7 +272,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
* @return
*/
private Object[] prepareStreamingRowObjectForSorting(Object[] row) {
- List<CarbonDimension> dimensions = segmentProperties.getDimensions();
Object[] preparedRow = new Object[dimensions.size() + measureCount];
for (int i = 0; i < dimensions.size(); i++) {
CarbonDimension dims = dimensions.get(i);
@@ -307,9 +310,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
private Object[] prepareRowObjectForSorting(Object[] row) {
ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
// ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount];
- List<CarbonDimension> dimensions = new ArrayList<>();
- dimensions.addAll(segmentProperties.getDimensions());
- dimensions.addAll(segmentProperties.getComplexDimensions());
Object[] preparedRow = new Object[dimensions.size() + measureCount];
// convert the dictionary from MDKey to surrogate key
byte[] dictionaryKey = wrapper.getDictionaryKey();
@@ -431,7 +431,9 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
*/
private void initSortDataRows() throws Exception {
measureCount = carbonTable.getMeasureByTableName(tableName).size();
- List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+ dimensions = new ArrayList<>(2);
+ dimensions.addAll(segmentProperties.getDimensions());
+ dimensions.addAll(segmentProperties.getComplexDimensions());
noDictionaryColMapping = new boolean[dimensions.size()];
sortColumnMapping = new boolean[dimensions.size()];
isVarcharDimMapping = new boolean[dimensions.size()];