You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:31 UTC
[29/45] carbondata git commit: [CARBONDATA-2984][Streaming] Fix NPE
when there is no data in the task of a batch
[CARBONDATA-2984][Streaming] Fix NPE when there is no data in the task of a batch
Fix NPE when there is no data in the task of a batch
Streaming batch maybe has no data, so it doesn't require to append blocklet to streaming file. So it doesn't need to update min/max index of streaming file, just use min/max index of old file .
This closes #2782
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fa9c8323
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fa9c8323
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fa9c8323
Branch: refs/heads/branch-1.5
Commit: fa9c8323c11c083452d75886cbbdad1f23d6dfb7
Parents: 0b16816
Author: QiangCai <qi...@qq.com>
Authored: Fri Sep 28 14:48:39 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Oct 3 20:13:50 2018 +0530
----------------------------------------------------------------------
.../TestStreamingTableOperation.scala | 49 +++++++++++++++++++-
.../streaming/CarbonStreamRecordWriter.java | 5 +-
.../streaming/segment/StreamSegment.java | 15 ++++--
3 files changed, 61 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa9c8323/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 43c1e5a..607c429 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -37,6 +37,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.NoSuchStreamException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
@@ -125,6 +126,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
createTable(tableName = "agg_table", streaming = true, withBatchLoad = false)
+ createTable(tableName = "stream_table_empty", streaming = true, withBatchLoad = false)
+
var csvDataDir = integrationPath + "/spark2/target/csvdatanew"
generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append)
@@ -213,6 +216,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists streaming.stream_table_reopen")
sql("drop table if exists streaming.stream_table_drop")
sql("drop table if exists streaming.agg_table_block")
+ sql("drop table if exists streaming.stream_table_empty")
}
// normal table not support streaming ingest
@@ -226,7 +230,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
.asInstanceOf[CarbonRelation].metaData.carbonTable
var server: ServerSocket = null
try {
- server = getServerSocket
+ server = getServerSocket()
val thread1 = createWriteSocketThread(server, 2, 10, 1)
thread1.start()
// use thread pool to catch the exception of sink thread
@@ -2253,6 +2257,46 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS dim")
}
+ // test empty batch
+ test("test empty batch") {
+ executeStreamingIngest(
+ tableName = "stream_table_empty",
+ batchNums = 1,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 1,
+ intervalOfIngest = 3,
+ continueSeconds = 10,
+ generateBadRecords = false,
+ badRecordAction = "force",
+ autoHandoff = false
+ )
+ var result = sql("select count(*) from streaming.stream_table_empty").collect()
+ assert(result(0).getLong(0) == 10)
+
+ // clean checkpointDir and logDir
+ val carbonTable = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_empty")(spark)
+ FileFactory
+ .deleteAllFilesOfDir(new File(CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)))
+ FileFactory
+ .deleteAllFilesOfDir(new File(CarbonTablePath
+ .getStreamingCheckpointDir(carbonTable.getTablePath)))
+
+ // some batches don't have data
+ executeStreamingIngest(
+ tableName = "stream_table_empty",
+ batchNums = 1,
+ rowNumsEachBatch = 1,
+ intervalOfSource = 1,
+ intervalOfIngest = 1,
+ continueSeconds = 10,
+ generateBadRecords = false,
+ badRecordAction = "force",
+ autoHandoff = false
+ )
+ result = sql("select count(*) from streaming.stream_table_empty").collect()
+ assert(result(0).getLong(0) == 11)
+ }
+
def createWriteSocketThread(
serverSocket: ServerSocket,
writeNums: Int,
@@ -2330,7 +2374,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
.load()
// Write data from socket stream to carbondata file
- qry = readSocketDF.writeStream
+ // repartition to simulate an empty partition when readSocketDF has only one row
+ qry = readSocketDF.repartition(2).writeStream
.format("carbondata")
.trigger(ProcessingTime(s"$intervalSecond seconds"))
.option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa9c8323/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 0d2a889..672f6a6 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -139,9 +139,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
segmentDir = CarbonTablePath.getSegmentPath(
carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0", segmentId);
- }
- private void initializeAtFirstRow() throws IOException, InterruptedException {
// initialize metadata
isNoDictionaryDimensionColumn =
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
@@ -153,6 +151,9 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
measureDataTypes[i] =
dataFields[dimensionWithComplexCount + i].getColumn().getDataType();
}
+ }
+
+ private void initializeAtFirstRow() throws IOException, InterruptedException {
// initialize parser and converter
rowParser = new RowParserImpl(dataFields, configuration);
badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa9c8323/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 51417c4..6ee6876 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -458,6 +458,13 @@ public class StreamSegment {
return;
}
+ BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
+ // if min/max of new blocklet is null, use min/max of old file
+ if (minMaxIndex == null) {
+ blockletIndex.setMinMaxIndex(fileIndex);
+ return;
+ }
+
DataType[] msrDataTypes = blockletIndex.getMsrDataTypes();
SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
for (int index = 0; index < comparators.length; index++) {
@@ -465,11 +472,11 @@ public class StreamSegment {
}
// min value
- byte[][] minValues = blockletIndex.getMinMaxIndex().getMinValues();
+ byte[][] minValues = minMaxIndex.getMinValues();
byte[][] mergedMinValues = fileIndex.getMinValues();
if (minValues == null || minValues.length == 0) {
// use file index
- blockletIndex.getMinMaxIndex().setMinValues(mergedMinValues);
+ minMaxIndex.setMinValues(mergedMinValues);
} else if (mergedMinValues != null && mergedMinValues.length != 0) {
if (minValues.length != mergedMinValues.length) {
throw new IOException("the lengths of the min values should be same.");
@@ -494,10 +501,10 @@ public class StreamSegment {
}
// max value
- byte[][] maxValues = blockletIndex.getMinMaxIndex().getMaxValues();
+ byte[][] maxValues = minMaxIndex.getMaxValues();
byte[][] mergedMaxValues = fileIndex.getMaxValues();
if (maxValues == null || maxValues.length == 0) {
- blockletIndex.getMinMaxIndex().setMaxValues(mergedMaxValues);
+ minMaxIndex.setMaxValues(mergedMaxValues);
} else if (mergedMaxValues != null && mergedMaxValues.length != 0) {
if (maxValues.length != mergedMaxValues.length) {
throw new IOException("the lengths of the max values should be same.");