You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/22 16:45:24 UTC

[42/50] [abbrv] carbondata git commit: [CARBONDATA-2198] Fixed bug for streaming data for bad_records_action as REDIRECT or IGNORE

[CARBONDATA-2198] Fixed bug for streaming data for bad_records_action as REDIRECT or IGNORE

1. Refactored streaming functionality for bad_records_action as IGNORE or REDIRECT
2. Added related test cases

This closes #2014


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/59693123
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/59693123
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/59693123

Branch: refs/heads/branch-1.4
Commit: 59693123da1c7aad17c284887e7819235427af74
Parents: 3394128
Author: Geetika Gupta <ge...@knoldus.in>
Authored: Wed Feb 28 16:09:48 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Tue May 22 10:30:00 2018 +0530

----------------------------------------------------------------------
 .../core/datastore/row/CarbonRow.java           |   4 +
 .../TestStreamingTableOperation.scala           |  76 ++++++++++-
 .../streaming/CarbonStreamRecordWriter.java     | 126 ++++++++++---------
 .../streaming/StreamBlockletWriter.java         |   5 +
 4 files changed, 148 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/59693123/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index bb624af..82f004f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -91,4 +91,8 @@ public class CarbonRow implements Serializable {
   public void setRangeId(short rangeId) {
     this.rangeId = rangeId;
   }
+
+  public void clearData() {
+    this.data = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59693123/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index f46505a..325722d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -45,8 +45,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
   private val spark = sqlContext.sparkSession
   private val dataFilePath = s"$resourcesPath/streamSample.csv"
+  def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../")
+    .getCanonicalPath
+  val badRecordFilePath: File =new File(currentPath + "/target/test/badRecords")
 
   override def beforeAll {
+    badRecordFilePath.mkdirs()
     CarbonProperties.getInstance().addProperty(
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
@@ -1562,6 +1566,68 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     assertResult("true")(resultStreaming(0).getString(1).trim)
   }
 
+
+  test("test bad_record_action IGNORE on streaming table") {
+
+sql("drop table if exists streaming.bad_record_ignore")
+    sql(
+      s"""
+         | CREATE TABLE streaming.bad_record_ignore(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('streaming'='true')
+         | """.stripMargin)
+
+    executeStreamingIngest(
+      tableName = "bad_record_ignore",
+      batchNums = 2,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 1,
+      intervalOfIngest = 1,
+      continueSeconds = 8,
+      generateBadRecords = true,
+      badRecordAction = "ignore",
+      autoHandoff = false
+    )
+
+    checkAnswer(sql("select count(*) from streaming.bad_record_ignore"), Seq(Row(19)))
+  }
+
+  test("test bad_record_action REDIRECT on streaming table") {
+    sql("drop table if exists streaming.bad_record_redirect")
+    sql(
+      s"""
+         | CREATE TABLE streaming.bad_record_redirect(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('streaming'='true')
+         | """.stripMargin)
+
+    executeStreamingIngest(
+      tableName = "bad_record_redirect",
+      batchNums = 2,
+      rowNumsEachBatch = 10,
+      intervalOfSource = 1,
+      intervalOfIngest = 1,
+      continueSeconds = 8,
+      generateBadRecords = true,
+      badRecordAction = "redirect",
+      autoHandoff = false,
+      badRecordsPath = badRecordFilePath.getCanonicalPath
+    )
+    assert(new File(badRecordFilePath.getCanonicalFile + "/streaming/bad_record_redirect").isDirectory)
+    checkAnswer(sql("select count(*) from streaming.bad_record_redirect"), Seq(Row(19)))
+  }
+
+
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,
@@ -1625,7 +1691,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
       handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
-      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean,
+      badRecordsPath: String = CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
   ): Thread = {
     new Thread() {
       override def run(): Unit = {
@@ -1643,6 +1710,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
             .trigger(ProcessingTime(s"$intervalSecond seconds"))
             .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
             .option("bad_records_action", badRecordAction)
+            .option("BAD_RECORD_PATH", badRecordsPath)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
             .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
@@ -1676,7 +1744,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       generateBadRecords: Boolean,
       badRecordAction: String,
       handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
-      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean,
+      badRecordsPath: String = CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
   ): Unit = {
     val identifier = new TableIdentifier(tableName, Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
@@ -1698,7 +1767,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,
         handoffSize = handoffSize,
-        autoHandoff = autoHandoff)
+        autoHandoff = autoHandoff,
+        badRecordsPath = badRecordsPath)
       thread1.start()
       thread2.start()
       Thread.sleep(continueSeconds * 1000)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59693123/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 4e555d3..4653445 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -179,75 +179,81 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
       initializeAtFirstRow();
     }
 
-    // parse and convert row
-    currentRow.setData(rowParser.parseRow((Object[]) value));
-    converter.convert(currentRow);
-
     // null bit set
     nullBitSet.clear();
-    for (int i = 0; i < dataFields.length; i++) {
-      if (null == currentRow.getObject(i)) {
-        nullBitSet.set(i);
+    Object[] rowData = (Object[]) value;
+    currentRow.setRawData(rowData);
+    // parse and convert row
+    currentRow.setData(rowParser.parseRow(rowData));
+    CarbonRow updatedCarbonRow = converter.convert(currentRow);
+    if (updatedCarbonRow == null) {
+      output.skipRow();
+      currentRow.clearData();
+    } else {
+      for (int i = 0; i < dataFields.length; i++) {
+        if (null == currentRow.getObject(i)) {
+          nullBitSet.set(i);
+        }
       }
-    }
-    output.nextRow();
-    byte[] b = nullBitSet.toByteArray();
-    output.writeShort(b.length);
-    if (b.length > 0) {
-      output.writeBytes(b);
-    }
-    int dimCount = 0;
-    Object columnValue;
+      output.nextRow();
+      byte[] b = nullBitSet.toByteArray();
+      output.writeShort(b.length);
+      if (b.length > 0) {
+        output.writeBytes(b);
+      }
+      int dimCount = 0;
+      Object columnValue;
 
-    // primitive type dimension
-    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
-      columnValue = currentRow.getObject(dimCount);
-      if (null != columnValue) {
-        if (isNoDictionaryDimensionColumn[dimCount]) {
+      // primitive type dimension
+      for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+        columnValue = currentRow.getObject(dimCount);
+        if (null != columnValue) {
+          if (isNoDictionaryDimensionColumn[dimCount]) {
+            byte[] col = (byte[]) columnValue;
+            output.writeShort(col.length);
+            output.writeBytes(col);
+          } else {
+            output.writeInt((int) columnValue);
+          }
+        }
+      }
+      // complex type dimension
+      for (; dimCount < dimensionWithComplexCount; dimCount++) {
+        columnValue = currentRow.getObject(dimCount);
+        if (null != columnValue) {
           byte[] col = (byte[]) columnValue;
           output.writeShort(col.length);
           output.writeBytes(col);
-        } else {
-          output.writeInt((int) columnValue);
         }
       }
-    }
-    // complex type dimension
-    for (; dimCount < dimensionWithComplexCount; dimCount++) {
-      columnValue = currentRow.getObject(dimCount);
-      if (null != columnValue) {
-        byte[] col = (byte[]) columnValue;
-        output.writeShort(col.length);
-        output.writeBytes(col);
-      }
-    }
-    // measure
-    DataType dataType;
-    for (int msrCount = 0; msrCount < measureCount; msrCount++) {
-      columnValue = currentRow.getObject(dimCount + msrCount);
-      if (null != columnValue) {
-        dataType = measureDataTypes[msrCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          output.writeBoolean((boolean) columnValue);
-        } else if (dataType == DataTypes.SHORT) {
-          output.writeShort((short) columnValue);
-        } else if (dataType == DataTypes.INT) {
-          output.writeInt((int) columnValue);
-        } else if (dataType == DataTypes.LONG) {
-          output.writeLong((long) columnValue);
-        } else if (dataType == DataTypes.DOUBLE) {
-          output.writeDouble((double) columnValue);
-        } else if (DataTypes.isDecimal(dataType)) {
-          BigDecimal val = (BigDecimal) columnValue;
-          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
-          output.writeShort(bigDecimalInBytes.length);
-          output.writeBytes(bigDecimalInBytes);
-        } else {
-          String msg =
-              "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
-                  .getName();
-          LOGGER.error(msg);
-          throw new IOException(msg);
+      // measure
+      DataType dataType;
+      for (int msrCount = 0; msrCount < measureCount; msrCount++) {
+        columnValue = currentRow.getObject(dimCount + msrCount);
+        if (null != columnValue) {
+          dataType = measureDataTypes[msrCount];
+          if (dataType == DataTypes.BOOLEAN) {
+            output.writeBoolean((boolean) columnValue);
+          } else if (dataType == DataTypes.SHORT) {
+            output.writeShort((short) columnValue);
+          } else if (dataType == DataTypes.INT) {
+            output.writeInt((int) columnValue);
+          } else if (dataType == DataTypes.LONG) {
+            output.writeLong((long) columnValue);
+          } else if (dataType == DataTypes.DOUBLE) {
+            output.writeDouble((double) columnValue);
+          } else if (DataTypes.isDecimal(dataType)) {
+            BigDecimal val = (BigDecimal) columnValue;
+            byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+            output.writeShort(bigDecimalInBytes.length);
+            output.writeBytes(bigDecimalInBytes);
+          } else {
+            String msg =
+                "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
+                .getName();
+            LOGGER.error(msg);
+            throw new IOException(msg);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/59693123/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
index 509e2aa..7b2176b 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -76,6 +76,11 @@ public class StreamBlockletWriter {
     rowIndex++;
   }
 
+  void skipRow() {
+    maxSize -- ;
+    maxRowNum -- ;
+  }
+
   boolean isFull() {
     return rowIndex == maxRowNum || count >= maxSize;
   }