You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/22 16:45:24 UTC
[42/50] [abbrv] carbondata git commit: [CARBONDATA-2198] Fixed bug
for streaming data for bad_records_action as REDIRECT or IGNORE
[CARBONDATA-2198] Fixed bug for streaming data for bad_records_action as REDIRECT or IGNORE
1. Refactored streaming functionality for bad_records_action as IGNORE or REDIRECT
2. Added related test cases
This closes #2014
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/59693123
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/59693123
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/59693123
Branch: refs/heads/branch-1.4
Commit: 59693123da1c7aad17c284887e7819235427af74
Parents: 3394128
Author: Geetika Gupta <ge...@knoldus.in>
Authored: Wed Feb 28 16:09:48 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Tue May 22 10:30:00 2018 +0530
----------------------------------------------------------------------
.../core/datastore/row/CarbonRow.java | 4 +
.../TestStreamingTableOperation.scala | 76 ++++++++++-
.../streaming/CarbonStreamRecordWriter.java | 126 ++++++++++---------
.../streaming/StreamBlockletWriter.java | 5 +
4 files changed, 148 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59693123/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index bb624af..82f004f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -91,4 +91,8 @@ public class CarbonRow implements Serializable {
public void setRangeId(short rangeId) {
this.rangeId = rangeId;
}
+
+ public void clearData() {
+ this.data = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59693123/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index f46505a..325722d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -45,8 +45,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
private val spark = sqlContext.sparkSession
private val dataFilePath = s"$resourcesPath/streamSample.csv"
+ def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../")
+ .getCanonicalPath
+ val badRecordFilePath: File =new File(currentPath + "/target/test/badRecords")
override def beforeAll {
+ badRecordFilePath.mkdirs()
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
@@ -1562,6 +1566,68 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
assertResult("true")(resultStreaming(0).getString(1).trim)
}
+
+ test("test bad_record_action IGNORE on streaming table") {
+
+sql("drop table if exists streaming.bad_record_ignore")
+ sql(
+ s"""
+ | CREATE TABLE streaming.bad_record_ignore(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('streaming'='true')
+ | """.stripMargin)
+
+ executeStreamingIngest(
+ tableName = "bad_record_ignore",
+ batchNums = 2,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 1,
+ intervalOfIngest = 1,
+ continueSeconds = 8,
+ generateBadRecords = true,
+ badRecordAction = "ignore",
+ autoHandoff = false
+ )
+
+ checkAnswer(sql("select count(*) from streaming.bad_record_ignore"), Seq(Row(19)))
+ }
+
+ test("test bad_record_action REDIRECT on streaming table") {
+ sql("drop table if exists streaming.bad_record_redirect")
+ sql(
+ s"""
+ | CREATE TABLE streaming.bad_record_redirect(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('streaming'='true')
+ | """.stripMargin)
+
+ executeStreamingIngest(
+ tableName = "bad_record_redirect",
+ batchNums = 2,
+ rowNumsEachBatch = 10,
+ intervalOfSource = 1,
+ intervalOfIngest = 1,
+ continueSeconds = 8,
+ generateBadRecords = true,
+ badRecordAction = "redirect",
+ autoHandoff = false,
+ badRecordsPath = badRecordFilePath.getCanonicalPath
+ )
+ assert(new File(badRecordFilePath.getCanonicalFile + "/streaming/bad_record_redirect").isDirectory)
+ checkAnswer(sql("select count(*) from streaming.bad_record_redirect"), Seq(Row(19)))
+ }
+
+
def createWriteSocketThread(
serverSocket: ServerSocket,
writeNums: Int,
@@ -1625,7 +1691,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
badRecordAction: String = "force",
intervalSecond: Int = 2,
handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
- autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+ autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean,
+ badRecordsPath: String = CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
): Thread = {
new Thread() {
override def run(): Unit = {
@@ -1643,6 +1710,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
.trigger(ProcessingTime(s"$intervalSecond seconds"))
.option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
.option("bad_records_action", badRecordAction)
+ .option("BAD_RECORD_PATH", badRecordsPath)
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
.option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
@@ -1676,7 +1744,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
generateBadRecords: Boolean,
badRecordAction: String,
handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
- autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+ autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean,
+ badRecordsPath: String = CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
): Unit = {
val identifier = new TableIdentifier(tableName, Option("streaming"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
@@ -1698,7 +1767,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
badRecordAction = badRecordAction,
intervalSecond = intervalOfIngest,
handoffSize = handoffSize,
- autoHandoff = autoHandoff)
+ autoHandoff = autoHandoff,
+ badRecordsPath = badRecordsPath)
thread1.start()
thread2.start()
Thread.sleep(continueSeconds * 1000)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59693123/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 4e555d3..4653445 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -179,75 +179,81 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
initializeAtFirstRow();
}
- // parse and convert row
- currentRow.setData(rowParser.parseRow((Object[]) value));
- converter.convert(currentRow);
-
// null bit set
nullBitSet.clear();
- for (int i = 0; i < dataFields.length; i++) {
- if (null == currentRow.getObject(i)) {
- nullBitSet.set(i);
+ Object[] rowData = (Object[]) value;
+ currentRow.setRawData(rowData);
+ // parse and convert row
+ currentRow.setData(rowParser.parseRow(rowData));
+ CarbonRow updatedCarbonRow = converter.convert(currentRow);
+ if (updatedCarbonRow == null) {
+ output.skipRow();
+ currentRow.clearData();
+ } else {
+ for (int i = 0; i < dataFields.length; i++) {
+ if (null == currentRow.getObject(i)) {
+ nullBitSet.set(i);
+ }
}
- }
- output.nextRow();
- byte[] b = nullBitSet.toByteArray();
- output.writeShort(b.length);
- if (b.length > 0) {
- output.writeBytes(b);
- }
- int dimCount = 0;
- Object columnValue;
+ output.nextRow();
+ byte[] b = nullBitSet.toByteArray();
+ output.writeShort(b.length);
+ if (b.length > 0) {
+ output.writeBytes(b);
+ }
+ int dimCount = 0;
+ Object columnValue;
- // primitive type dimension
- for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
- columnValue = currentRow.getObject(dimCount);
- if (null != columnValue) {
- if (isNoDictionaryDimensionColumn[dimCount]) {
+ // primitive type dimension
+ for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+ columnValue = currentRow.getObject(dimCount);
+ if (null != columnValue) {
+ if (isNoDictionaryDimensionColumn[dimCount]) {
+ byte[] col = (byte[]) columnValue;
+ output.writeShort(col.length);
+ output.writeBytes(col);
+ } else {
+ output.writeInt((int) columnValue);
+ }
+ }
+ }
+ // complex type dimension
+ for (; dimCount < dimensionWithComplexCount; dimCount++) {
+ columnValue = currentRow.getObject(dimCount);
+ if (null != columnValue) {
byte[] col = (byte[]) columnValue;
output.writeShort(col.length);
output.writeBytes(col);
- } else {
- output.writeInt((int) columnValue);
}
}
- }
- // complex type dimension
- for (; dimCount < dimensionWithComplexCount; dimCount++) {
- columnValue = currentRow.getObject(dimCount);
- if (null != columnValue) {
- byte[] col = (byte[]) columnValue;
- output.writeShort(col.length);
- output.writeBytes(col);
- }
- }
- // measure
- DataType dataType;
- for (int msrCount = 0; msrCount < measureCount; msrCount++) {
- columnValue = currentRow.getObject(dimCount + msrCount);
- if (null != columnValue) {
- dataType = measureDataTypes[msrCount];
- if (dataType == DataTypes.BOOLEAN) {
- output.writeBoolean((boolean) columnValue);
- } else if (dataType == DataTypes.SHORT) {
- output.writeShort((short) columnValue);
- } else if (dataType == DataTypes.INT) {
- output.writeInt((int) columnValue);
- } else if (dataType == DataTypes.LONG) {
- output.writeLong((long) columnValue);
- } else if (dataType == DataTypes.DOUBLE) {
- output.writeDouble((double) columnValue);
- } else if (DataTypes.isDecimal(dataType)) {
- BigDecimal val = (BigDecimal) columnValue;
- byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
- output.writeShort(bigDecimalInBytes.length);
- output.writeBytes(bigDecimalInBytes);
- } else {
- String msg =
- "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
- .getName();
- LOGGER.error(msg);
- throw new IOException(msg);
+ // measure
+ DataType dataType;
+ for (int msrCount = 0; msrCount < measureCount; msrCount++) {
+ columnValue = currentRow.getObject(dimCount + msrCount);
+ if (null != columnValue) {
+ dataType = measureDataTypes[msrCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ output.writeBoolean((boolean) columnValue);
+ } else if (dataType == DataTypes.SHORT) {
+ output.writeShort((short) columnValue);
+ } else if (dataType == DataTypes.INT) {
+ output.writeInt((int) columnValue);
+ } else if (dataType == DataTypes.LONG) {
+ output.writeLong((long) columnValue);
+ } else if (dataType == DataTypes.DOUBLE) {
+ output.writeDouble((double) columnValue);
+ } else if (DataTypes.isDecimal(dataType)) {
+ BigDecimal val = (BigDecimal) columnValue;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+ output.writeShort(bigDecimalInBytes.length);
+ output.writeBytes(bigDecimalInBytes);
+ } else {
+ String msg =
+ "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
+ .getName();
+ LOGGER.error(msg);
+ throw new IOException(msg);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59693123/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
index 509e2aa..7b2176b 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -76,6 +76,11 @@ public class StreamBlockletWriter {
rowIndex++;
}
+ void skipRow() {
+ maxSize -- ;
+ maxRowNum -- ;
+ }
+
boolean isFull() {
return rowIndex == maxRowNum || count >= maxSize;
}