You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/03 14:44:05 UTC

carbondata git commit: [CARBONDATA-2984][Streaming] Fix NPE when there is no data in the task of a batch

Repository: carbondata
Updated Branches:
  refs/heads/master 0b16816da -> fa9c8323c


[CARBONDATA-2984][Streaming] Fix NPE when there is no data in the task of a batch

Fix NPE when there is no data in the task of a batch

Streaming batch maybe has no data, so it doesn't require to append blocklet to streaming file. So it doesn't need to update min/max index of streaming file, just use min/max index of old file .

This closes #2782


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fa9c8323
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fa9c8323
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fa9c8323

Branch: refs/heads/master
Commit: fa9c8323c11c083452d75886cbbdad1f23d6dfb7
Parents: 0b16816
Author: QiangCai <qi...@qq.com>
Authored: Fri Sep 28 14:48:39 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Oct 3 20:13:50 2018 +0530

----------------------------------------------------------------------
 .../TestStreamingTableOperation.scala           | 49 +++++++++++++++++++-
 .../streaming/CarbonStreamRecordWriter.java     |  5 +-
 .../streaming/segment/StreamSegment.java        | 15 ++++--
 3 files changed, 61 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa9c8323/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 43c1e5a..607c429 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -37,6 +37,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.common.exceptions.NoSuchStreamException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
@@ -125,6 +126,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
     createTable(tableName = "agg_table", streaming = true, withBatchLoad = false)
 
+    createTable(tableName = "stream_table_empty", streaming = true, withBatchLoad = false)
+
     var csvDataDir = integrationPath + "/spark2/target/csvdatanew"
     generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
     generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append)
@@ -213,6 +216,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists streaming.stream_table_reopen")
     sql("drop table if exists streaming.stream_table_drop")
     sql("drop table if exists streaming.agg_table_block")
+    sql("drop table if exists streaming.stream_table_empty")
   }
 
   // normal table not support streaming ingest
@@ -226,7 +230,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       .asInstanceOf[CarbonRelation].metaData.carbonTable
     var server: ServerSocket = null
     try {
-      server = getServerSocket
+      server = getServerSocket()
       val thread1 = createWriteSocketThread(server, 2, 10, 1)
       thread1.start()
       // use thread pool to catch the exception of sink thread
@@ -2253,6 +2257,46 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS dim")
   }
 
+  // test empty batch
+  test("test empty batch") {
+    executeStreamingIngest(
+      tableName = "stream_table_empty",
+      batchNums = 1,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 1,
+      intervalOfIngest = 3,
+      continueSeconds = 10,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      autoHandoff = false
+    )
+    var result = sql("select count(*) from streaming.stream_table_empty").collect()
+    assert(result(0).getLong(0) == 10)
+
+    // clean checkpointDir and logDir
+    val carbonTable = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_empty")(spark)
+    FileFactory
+      .deleteAllFilesOfDir(new File(CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)))
+    FileFactory
+      .deleteAllFilesOfDir(new File(CarbonTablePath
+        .getStreamingCheckpointDir(carbonTable.getTablePath)))
+
+    // some batches don't have data
+    executeStreamingIngest(
+      tableName = "stream_table_empty",
+      batchNums = 1,
+      rowNumsEachBatch = 1,
+      intervalOfSource = 1,
+      intervalOfIngest = 1,
+      continueSeconds = 10,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      autoHandoff = false
+    )
+    result = sql("select count(*) from streaming.stream_table_empty").collect()
+    assert(result(0).getLong(0) == 11)
+  }
+
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,
@@ -2330,7 +2374,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
             .load()
 
           // Write data from socket stream to carbondata file
-          qry = readSocketDF.writeStream
+          // repartition to simulate an empty partition when readSocketDF has only one row
+          qry = readSocketDF.repartition(2).writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"$intervalSecond seconds"))
             .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa9c8323/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 0d2a889..672f6a6 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -139,9 +139,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
     segmentDir = CarbonTablePath.getSegmentPath(
         carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
     fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0", segmentId);
-  }
 
-  private void initializeAtFirstRow() throws IOException, InterruptedException {
     // initialize metadata
     isNoDictionaryDimensionColumn =
         CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
@@ -153,6 +151,9 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
       measureDataTypes[i] =
           dataFields[dimensionWithComplexCount + i].getColumn().getDataType();
     }
+  }
+
+  private void initializeAtFirstRow() throws IOException, InterruptedException {
     // initialize parser and converter
     rowParser = new RowParserImpl(dataFields, configuration);
     badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa9c8323/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 51417c4..6ee6876 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -458,6 +458,13 @@ public class StreamSegment {
       return;
     }
 
+    BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
+    // if min/max of new blocklet is null, use min/max of old file
+    if (minMaxIndex == null) {
+      blockletIndex.setMinMaxIndex(fileIndex);
+      return;
+    }
+
     DataType[] msrDataTypes = blockletIndex.getMsrDataTypes();
     SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
     for (int index = 0; index < comparators.length; index++) {
@@ -465,11 +472,11 @@ public class StreamSegment {
     }
 
     // min value
-    byte[][] minValues = blockletIndex.getMinMaxIndex().getMinValues();
+    byte[][] minValues = minMaxIndex.getMinValues();
     byte[][] mergedMinValues = fileIndex.getMinValues();
     if (minValues == null || minValues.length == 0) {
       // use file index
-      blockletIndex.getMinMaxIndex().setMinValues(mergedMinValues);
+      minMaxIndex.setMinValues(mergedMinValues);
     } else if (mergedMinValues != null && mergedMinValues.length != 0) {
       if (minValues.length != mergedMinValues.length) {
         throw new IOException("the lengths of the min values should be same.");
@@ -494,10 +501,10 @@ public class StreamSegment {
     }
 
     // max value
-    byte[][] maxValues = blockletIndex.getMinMaxIndex().getMaxValues();
+    byte[][] maxValues = minMaxIndex.getMaxValues();
     byte[][] mergedMaxValues = fileIndex.getMaxValues();
     if (maxValues == null || maxValues.length == 0) {
-      blockletIndex.getMinMaxIndex().setMaxValues(mergedMaxValues);
+      minMaxIndex.setMaxValues(mergedMaxValues);
     } else if (mergedMaxValues != null && mergedMaxValues.length != 0) {
       if (maxValues.length != mergedMaxValues.length) {
         throw new IOException("the lengths of the max values should be same.");