You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by zh...@apache.org on 2018/10/19 15:15:04 UTC
carbondata git commit: Fix unsupported data type exception for
streaming
Repository: carbondata
Updated Branches:
refs/heads/master f810389fa -> f664805bf
Fix unsupported data type exception for streaming
Background:
when spark uses Kryo serialization, streaming app throws the exception "Unsupported data type".
Root cause:
1. collect the data type list to driver side from executor side.
2. when using Kryo, datatype single instances are not working.
Solution:
don't collect measure data type list from executor side to avoid serialization.
This closes #2832
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f664805b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f664805b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f664805b
Branch: refs/heads/master
Commit: f664805bf6fc9dea1683123aa7a25e7913dac9ae
Parents: f810389
Author: QiangCai <qi...@qq.com>
Authored: Thu Oct 18 10:56:13 2018 +0800
Committer: Zhang Zhichao <44...@qq.com>
Committed: Fri Oct 19 23:14:33 2018 +0800
----------------------------------------------------------------------
.../streaming/CarbonAppendableStreamSink.scala | 25 +++++++++++++++-----
.../streaming/index/StreamFileIndex.java | 10 --------
.../streaming/segment/StreamSegment.java | 19 +++++++--------
3 files changed, 28 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f664805b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 2fdbc86..3d8170e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
import java.util.Date
+import scala.collection.JavaConverters._
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -36,6 +38,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.metadata.datatype.DataType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.stats.QueryStatistic
import org.apache.carbondata.core.util.CarbonProperties
@@ -43,7 +46,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.rdd.StreamHandoffRDD
@@ -102,6 +104,17 @@ class CarbonAppendableStreamSink(
CarbonProperties.getInstance().isEnableAutoHandoff
)
+ // measure data type array
+ private lazy val msrDataTypes = {
+ carbonLoadModel
+ .getCarbonDataLoadSchema
+ .getCarbonTable
+ .getMeasures
+ .asScala
+ .map(_.getDataType)
+ .toArray
+ }
+
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId")
@@ -133,14 +146,14 @@ class CarbonAppendableStreamSink(
CarbonAppendableStreamSink.writeDataFileJob(
sparkSession,
carbonTable,
- parameters,
batchId,
currentSegmentId,
data.queryExecution,
committer,
hadoopConf,
carbonLoadModel,
- server)
+ server,
+ msrDataTypes)
// fire post event on every batch add
val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
carbonTable.getCarbonTableIdentifier,
@@ -212,14 +225,14 @@ object CarbonAppendableStreamSink {
def writeDataFileJob(
sparkSession: SparkSession,
carbonTable: CarbonTable,
- parameters: Map[String, String],
batchId: Long,
segmentId: String,
queryExecution: QueryExecution,
committer: FileCommitProtocol,
hadoopConf: Configuration,
carbonLoadModel: CarbonLoadModel,
- server: Option[DictionaryServer]): Unit = {
+ server: Option[DictionaryServer],
+ msrDataTypes: Array[DataType]): Unit = {
// create job
val job = Job.getInstance(hadoopConf)
@@ -276,7 +289,7 @@ object CarbonAppendableStreamSink {
// update data file info in index file
StreamSegment.updateIndexFile(
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId),
- result.map(_._2))
+ result.map(_._2), msrDataTypes)
} catch {
// catch fault of executor side
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f664805b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
index fa8a694..0a5113e 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
@InterfaceAudience.Internal
public class StreamFileIndex implements Serializable {
@@ -35,8 +34,6 @@ public class StreamFileIndex implements Serializable {
private long rowCount;
- private DataType[] msrDataTypes;
-
public StreamFileIndex(String fileName, BlockletMinMaxIndex minMaxIndex, long rowCount) {
this.fileName = fileName;
this.minMaxIndex = minMaxIndex;
@@ -67,11 +64,4 @@ public class StreamFileIndex implements Serializable {
this.rowCount = rowCount;
}
- public DataType[] getMsrDataTypes() {
- return msrDataTypes;
- }
-
- public void setMsrDataTypes(DataType[] msrDataTypes) {
- this.msrDataTypes = msrDataTypes;
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f664805b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index b436b18..ba3d64a 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -269,10 +269,9 @@ public class StreamSegment {
* create a StreamBlockIndex from the SimpleStatsResult array
*/
private static StreamFileIndex createStreamBlockIndex(String fileName,
- BlockletMinMaxIndex minMaxIndex, DataType[] msrDataTypes, int blockletRowCount) {
+ BlockletMinMaxIndex minMaxIndex, int blockletRowCount) {
StreamFileIndex streamFileIndex =
new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
- streamFileIndex.setMsrDataTypes(msrDataTypes);
return streamFileIndex;
}
@@ -298,7 +297,7 @@ public class StreamSegment {
inputIterators.close();
return createStreamBlockIndex(writer.getFileName(), writer.getBatchMinMaxIndex(),
- writer.getMeasureDataTypes(), blockletRowCount);
+ blockletRowCount);
} catch (Throwable ex) {
if (writer != null) {
LOGGER.error("Failed to append batch data to stream segment: " +
@@ -449,8 +448,8 @@ public class StreamSegment {
* 2.1 if blocklet index is null, use the BlockletMinMaxIndex index of stream
* 2.2 if blocklet index is not null, combine these two index
*/
- private static void mergeBatchMinMax(StreamFileIndex blockletIndex, BlockletMinMaxIndex fileIndex)
- throws IOException {
+ private static void mergeBatchMinMax(StreamFileIndex blockletIndex,
+ BlockletMinMaxIndex fileIndex, DataType[] msrDataTypes) throws IOException {
if (fileIndex == null) {
// backward compatibility
// it will not create a min/max index for the old stream file(without min/max index).
@@ -465,7 +464,6 @@ public class StreamSegment {
return;
}
- DataType[] msrDataTypes = blockletIndex.getMsrDataTypes();
SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
for (int index = 0; index < comparators.length; index++) {
comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
@@ -594,7 +592,8 @@ public class StreamSegment {
* merge new blocklet index and old file index to create new file index
*/
private static void updateStreamFileIndex(Map<String, StreamFileIndex> indexMap,
- String indexPath, FileFactory.FileType fileType) throws IOException {
+ String indexPath, FileFactory.FileType fileType, DataType[] msrDataTypes
+ ) throws IOException {
List<BlockIndex> blockIndexList = readIndexFile(indexPath, fileType);
for (BlockIndex blockIndex : blockIndexList) {
BlockletMinMaxIndex fileIndex = CarbonMetadataUtil
@@ -607,7 +606,7 @@ public class StreamSegment {
} else {
// merge minMaxIndex into StreamBlockIndex
blockletIndex.setRowCount(blockletIndex.getRowCount() + blockIndex.getNum_rows());
- mergeBatchMinMax(blockletIndex, fileIndex);
+ mergeBatchMinMax(blockletIndex, fileIndex, msrDataTypes);
}
}
}
@@ -616,7 +615,7 @@ public class StreamSegment {
* update carbon index file after a stream batch.
*/
public static void updateIndexFile(String segmentDir,
- StreamFileIndex[] blockIndexes) throws IOException {
+ StreamFileIndex[] blockIndexes, DataType[] msrDataTypes) throws IOException {
FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
String filePath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
// update min/max index
@@ -624,7 +623,7 @@ public class StreamSegment {
for (StreamFileIndex fileIndex : blockIndexes) {
indexMap.put(fileIndex.getFileName(), fileIndex);
}
- updateStreamFileIndex(indexMap, filePath, fileType);
+ updateStreamFileIndex(indexMap, filePath, fileType, msrDataTypes);
String tempFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
CarbonIndexFileWriter writer = new CarbonIndexFileWriter();