You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by zh...@apache.org on 2018/10/19 15:15:04 UTC

carbondata git commit: Fix unsupported data type exception for streaming

Repository: carbondata
Updated Branches:
  refs/heads/master f810389fa -> f664805bf


Fix unsupported data type exception for streaming

Background:

when spark uses Kryo serialization, streaming app throws the exception "Unsupported data type".

Root cause:

  1. collect the data type list to driver side from executor side.

  2. when using Kryo, datatype single instances are not working.

Solution:

don't collect measure data type list from executor side to avoid serialization.

This closes #2832


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f664805b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f664805b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f664805b

Branch: refs/heads/master
Commit: f664805bf6fc9dea1683123aa7a25e7913dac9ae
Parents: f810389
Author: QiangCai <qi...@qq.com>
Authored: Thu Oct 18 10:56:13 2018 +0800
Committer: Zhang Zhichao <44...@qq.com>
Committed: Fri Oct 19 23:14:33 2018 +0800

----------------------------------------------------------------------
 .../streaming/CarbonAppendableStreamSink.scala  | 25 +++++++++++++++-----
 .../streaming/index/StreamFileIndex.java        | 10 --------
 .../streaming/segment/StreamSegment.java        | 19 +++++++--------
 3 files changed, 28 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f664805b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 2fdbc86..3d8170e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -36,6 +38,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.stats.QueryStatistic
 import org.apache.carbondata.core.util.CarbonProperties
@@ -43,7 +46,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.rdd.StreamHandoffRDD
@@ -102,6 +104,17 @@ class CarbonAppendableStreamSink(
     CarbonProperties.getInstance().isEnableAutoHandoff
   )
 
+  // measure data type array
+  private lazy val msrDataTypes = {
+    carbonLoadModel
+      .getCarbonDataLoadSchema
+      .getCarbonTable
+      .getMeasures
+      .asScala
+      .map(_.getDataType)
+      .toArray
+  }
+
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
       CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId")
@@ -133,14 +146,14 @@ class CarbonAppendableStreamSink(
       CarbonAppendableStreamSink.writeDataFileJob(
         sparkSession,
         carbonTable,
-        parameters,
         batchId,
         currentSegmentId,
         data.queryExecution,
         committer,
         hadoopConf,
         carbonLoadModel,
-        server)
+        server,
+        msrDataTypes)
       // fire post event on every batch add
       val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
         carbonTable.getCarbonTableIdentifier,
@@ -212,14 +225,14 @@ object CarbonAppendableStreamSink {
   def writeDataFileJob(
       sparkSession: SparkSession,
       carbonTable: CarbonTable,
-      parameters: Map[String, String],
       batchId: Long,
       segmentId: String,
       queryExecution: QueryExecution,
       committer: FileCommitProtocol,
       hadoopConf: Configuration,
       carbonLoadModel: CarbonLoadModel,
-      server: Option[DictionaryServer]): Unit = {
+      server: Option[DictionaryServer],
+      msrDataTypes: Array[DataType]): Unit = {
 
     // create job
     val job = Job.getInstance(hadoopConf)
@@ -276,7 +289,7 @@ object CarbonAppendableStreamSink {
         // update data file info in index file
         StreamSegment.updateIndexFile(
           CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId),
-          result.map(_._2))
+          result.map(_._2), msrDataTypes)
 
       } catch {
         // catch fault of executor side

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f664805b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
index fa8a694..0a5113e 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 
 @InterfaceAudience.Internal
 public class StreamFileIndex implements Serializable {
@@ -35,8 +34,6 @@ public class StreamFileIndex implements Serializable {
 
   private long rowCount;
 
-  private DataType[] msrDataTypes;
-
   public StreamFileIndex(String fileName, BlockletMinMaxIndex minMaxIndex, long rowCount) {
     this.fileName = fileName;
     this.minMaxIndex = minMaxIndex;
@@ -67,11 +64,4 @@ public class StreamFileIndex implements Serializable {
     this.rowCount = rowCount;
   }
 
-  public DataType[] getMsrDataTypes() {
-    return msrDataTypes;
-  }
-
-  public void setMsrDataTypes(DataType[] msrDataTypes) {
-    this.msrDataTypes = msrDataTypes;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f664805b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index b436b18..ba3d64a 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -269,10 +269,9 @@ public class StreamSegment {
    * create a StreamBlockIndex from the SimpleStatsResult array
    */
   private static StreamFileIndex createStreamBlockIndex(String fileName,
-      BlockletMinMaxIndex minMaxIndex, DataType[] msrDataTypes, int blockletRowCount) {
+      BlockletMinMaxIndex minMaxIndex, int blockletRowCount) {
     StreamFileIndex streamFileIndex =
         new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
-    streamFileIndex.setMsrDataTypes(msrDataTypes);
     return streamFileIndex;
   }
 
@@ -298,7 +297,7 @@ public class StreamSegment {
       inputIterators.close();
 
       return createStreamBlockIndex(writer.getFileName(), writer.getBatchMinMaxIndex(),
-          writer.getMeasureDataTypes(), blockletRowCount);
+          blockletRowCount);
     } catch (Throwable ex) {
       if (writer != null) {
         LOGGER.error("Failed to append batch data to stream segment: " +
@@ -449,8 +448,8 @@ public class StreamSegment {
    * 2.1 if blocklet index is null, use the BlockletMinMaxIndex index of stream
    * 2.2 if blocklet index is not null, combine these two index
    */
-  private static void mergeBatchMinMax(StreamFileIndex blockletIndex, BlockletMinMaxIndex fileIndex)
-      throws IOException {
+  private static void mergeBatchMinMax(StreamFileIndex blockletIndex,
+      BlockletMinMaxIndex fileIndex, DataType[] msrDataTypes) throws IOException {
     if (fileIndex == null) {
       // backward compatibility
       // it will not create a min/max index for the old stream file(without min/max index).
@@ -465,7 +464,6 @@ public class StreamSegment {
       return;
     }
 
-    DataType[] msrDataTypes = blockletIndex.getMsrDataTypes();
     SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
     for (int index = 0; index < comparators.length; index++) {
       comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
@@ -594,7 +592,8 @@ public class StreamSegment {
    * merge new blocklet index and old file index to create new file index
    */
   private static void updateStreamFileIndex(Map<String, StreamFileIndex> indexMap,
-      String indexPath, FileFactory.FileType fileType) throws IOException {
+      String indexPath, FileFactory.FileType fileType, DataType[] msrDataTypes
+  ) throws IOException {
     List<BlockIndex> blockIndexList = readIndexFile(indexPath, fileType);
     for (BlockIndex blockIndex : blockIndexList) {
       BlockletMinMaxIndex fileIndex = CarbonMetadataUtil
@@ -607,7 +606,7 @@ public class StreamSegment {
       } else {
         // merge minMaxIndex into StreamBlockIndex
         blockletIndex.setRowCount(blockletIndex.getRowCount() + blockIndex.getNum_rows());
-        mergeBatchMinMax(blockletIndex, fileIndex);
+        mergeBatchMinMax(blockletIndex, fileIndex, msrDataTypes);
       }
     }
   }
@@ -616,7 +615,7 @@ public class StreamSegment {
    * update carbon index file after a stream batch.
    */
   public static void updateIndexFile(String segmentDir,
-      StreamFileIndex[] blockIndexes) throws IOException {
+      StreamFileIndex[] blockIndexes, DataType[] msrDataTypes) throws IOException {
     FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
     String filePath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
     // update min/max index
@@ -624,7 +623,7 @@ public class StreamSegment {
     for (StreamFileIndex fileIndex : blockIndexes) {
       indexMap.put(fileIndex.getFileName(), fileIndex);
     }
-    updateStreamFileIndex(indexMap, filePath, fileType);
+    updateStreamFileIndex(indexMap, filePath, fileType, msrDataTypes);
 
     String tempFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();