You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by QiangCai <gi...@git.apache.org> on 2017/11/06 08:22:33 UTC

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

GitHub user QiangCai opened a pull request:

    https://github.com/apache/carbondata/pull/1470

    [CARBONDATA-1572] Support streaming ingest and query

    1. row format writer and support to append batch data
    
    2. support StreamSinkProvider and append batch data to row format file
    
    3. row format reader and support to split row format file to small blocks
    
    4. query with streaming row format file.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/QiangCai/carbondata streaming_all

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/1470.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1470
    
----
commit 702d4564a64574f50bb5e9b9e431b9830b1e525f
Author: QiangCai <qi...@qq.com>
Date:   2017-10-18T03:13:00Z

    support streaming ingest and query

----


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/896/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149070335
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -210,8 +247,18 @@ class CarbonScanRDD(
         inputMetricsStats.initBytesReadCallback(context, inputSplit)
         val iterator = if (inputSplit.getAllSplits.size() > 0) {
           val model = format.getQueryModel(inputSplit, attemptContext)
    -      val reader = {
    -        if (vectorReader) {
    +      val reader: RecordReader[Void, Object] = {
    +        if (inputSplit.isStream) {
    +          DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
    +          val inputFormat = new CarbonStreamInputFormat
    +          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
    +            .asInstanceOf[CarbonStreamRecordReader]
    +          streamReader.setVectorReader(vectorReader)
    +          model.setStatisticsRecorder(
    +            CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
    +          streamReader.setQueryModel(model)
    --- End diff --
    
    It is better to put `model` in constructor of `CarbonStreamRecordReader`


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572][Streaming] Support streamin...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149617389
  
    --- Diff: pom.xml ---
    @@ -478,6 +471,7 @@
             <module>integration/spark2</module>
             <module>integration/hive</module>
             <module>integration/presto</module>
    +        <module>streaming</module>
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149064920
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    --- End diff --
    
    add space after `for`


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149283558
  
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.encoder.Encoding
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.DataLoadingUtil
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +/**
    + * Stream sink factory
    + */
    +object StreamSinkFactory {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(StreamSinkFactory.getClass.getCanonicalName)
    +
    +  def createStreamTableSink(
    +      sparkSession: SparkSession,
    +      carbonTable: CarbonTable,
    +      parameters: Map[String, String]): Sink = {
    +      validateParameters(parameters)
    +
    +    // prepare the stream segment
    +    val segmentId = getStreamSegmentId(carbonTable)
    +    // build load model
    +    val carbonLoadModel = buildCarbonLoadModelForStream(
    +      sparkSession,
    +      carbonTable,
    +      parameters,
    +      segmentId)
    +    // start server if necessary
    +    val server = startDictionaryServer(
    --- End diff --
    
    this method will add SparkListener to sc, it will trigger shutdown on application end.


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149071231
  
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.encoder.Encoding
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.DataLoadingUtil
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +/**
    + * Stream sink factory
    + */
    +object StreamSinkFactory {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(StreamSinkFactory.getClass.getCanonicalName)
    +
    +  def createStreamTableSink(
    +      sparkSession: SparkSession,
    +      carbonTable: CarbonTable,
    +      parameters: Map[String, String]): Sink = {
    +      validateParameters(parameters)
    --- End diff --
    
    Incorrect identation


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149070667
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---
    @@ -208,6 +211,31 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         }
       }
     
    +  override def createSink(sqlContext: SQLContext,
    --- End diff --
    
    please add description of this function 


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149067107
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    +      val carbonInputSplit = splits.get(i).asInstanceOf[CarbonInputSplit]
    +      if ("row-format".equals(carbonInputSplit.getFormat)) {
    +        streamSplits += splits.get(i)
    +      } else {
    +        columnarSplits.add(splits.get(i))
    +      }
    +    }
    +    val batchPartitions = distributeSplits(columnarSplits)
    +    if (streamSplits.isEmpty) {
    +      batchPartitions
    +    } else {
    +      val index = batchPartitions.length
    +      val streamPartitions: ArrayBuffer[Partition] =
    +        streamSplits.zipWithIndex.map { splitWithIndex =>
    +          val multiBlockSplit =
    +            new CarbonMultiBlockSplit(identifier,
    +              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
    +              splitWithIndex._1.getLocations)
    +          multiBlockSplit.setStream(true)
    +          new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit)
    +        }
    +      if (batchPartitions.isEmpty) {
    --- End diff --
    
    instead of this, you can do
    ```
    streamPartitions.appendAll(batchPartitions)
    streamPartitions.toArray
    ```


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1517/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572] Support streaming ingest and query

Posted by chenliang613 <gi...@git.apache.org>.
Github user chenliang613 commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    LGTM


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572][Streaming] Support streamin...

Posted by chenliang613 <gi...@git.apache.org>.
Github user chenliang613 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149608765
  
    --- Diff: pom.xml ---
    @@ -478,6 +471,7 @@
             <module>integration/spark2</module>
             <module>integration/hive</module>
             <module>integration/presto</module>
    +        <module>streaming</module>
    --- End diff --
    
    please add streaming module to "build-all" profile


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149072656
  
    --- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.internal.io.FileCommitProtocol
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +class CarbonAppendableStreamSink(
    --- End diff --
    
    Add description of this class


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    test this please


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/877/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1511/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149068014
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -210,8 +247,18 @@ class CarbonScanRDD(
         inputMetricsStats.initBytesReadCallback(context, inputSplit)
         val iterator = if (inputSplit.getAllSplits.size() > 0) {
           val model = format.getQueryModel(inputSplit, attemptContext)
    -      val reader = {
    -        if (vectorReader) {
    +      val reader: RecordReader[Void, Object] = {
    +        if (inputSplit.isStream) {
    +          DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
    +          val inputFormat = new CarbonStreamInputFormat
    +          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
    +            .asInstanceOf[CarbonStreamRecordReader]
    +          streamReader.setVectorReader(vectorReader)
    +          model.setStatisticsRecorder(
    +            CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
    +          streamReader.setQueryModel(model)
    +          streamReader
    +        } else if (vectorReader) {
    --- End diff --
    
    It is better to use else instead of else if, so that in the else block it handles for columnar format


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572] Support streaming ingest and query

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/828/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1490/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1515/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/911/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149065241
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    --- End diff --
    
    use `splits.asScala.foreach` instead of `for` loop


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/900/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149078206
  
    --- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.internal.io.FileCommitProtocol
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +class CarbonAppendableStreamSink(
    +    sparkSession: SparkSession,
    +    val carbonTable: CarbonTable,
    +    var currentSegmentId: String,
    +    parameters: Map[String, String],
    +    carbonLoadModel: CarbonLoadModel,
    +    sever: Option[DictionaryServer]) extends Sink {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +  private val carbonTablePath = CarbonStorePath
    +    .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
    +  private val fileLogPath = carbonTablePath.getStreamingLogDir
    +  private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
    +  // prepare configuration
    +  private val hadoopConf = {
    +    val conf = sparkSession.sessionState.newHadoopConf()
    +    CarbonStreamOutputFormat.setCarbonLoadModel(conf, carbonLoadModel)
    +    // put all parameters into hadoopConf
    +    parameters.foreach { entry =>
    +      conf.set(entry._1, entry._2)
    +    }
    +    conf
    +  }
    +
    +  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    +    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
    +      LOGGER.info(s"Skipping already committed batch $batchId")
    +    } else {
    +      checkOrHandOffSegment()
    +
    +      val committer = FileCommitProtocol.instantiate(
    +        className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
    +        jobId = batchId.toString,
    +        outputPath = fileLogPath,
    +        isAppend = false)
    +
    +      committer match {
    +        case manifestCommitter: ManifestFileCommitProtocol =>
    +          manifestCommitter.setupManifestOptions(fileLog, batchId)
    +        case _ => // Do nothing
    +      }
    +
    +      CarbonStreamProcessor.writeDataFileJob(
    +        sparkSession,
    +        carbonTable,
    +        parameters,
    +        batchId,
    +        currentSegmentId,
    +        data.queryExecution,
    +        committer,
    +        hadoopConf,
    +        sever)
    +    }
    +  }
    +
    +  // if the directory size of current segment beyond the threshold, hand off new segment
    +  private def checkOrHandOffSegment(): Unit = {
    +    val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
    +    val fileType = FileFactory.getFileType(segmentDir)
    +    if (StreamSegmentManager.STREAM_SEGMENT_MAX_SIZE <= FileFactory.getDirectorySize(segmentDir)) {
    --- End diff --
    
    Can we make use of metadata instead of checking file system for every batch?


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1526/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/903/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    retest this please


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149067739
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    +      val carbonInputSplit = splits.get(i).asInstanceOf[CarbonInputSplit]
    +      if ("row-format".equals(carbonInputSplit.getFormat)) {
    +        streamSplits += splits.get(i)
    +      } else {
    +        columnarSplits.add(splits.get(i))
    +      }
    +    }
    +    val batchPartitions = distributeSplits(columnarSplits)
    +    if (streamSplits.isEmpty) {
    +      batchPartitions
    +    } else {
    +      val index = batchPartitions.length
    +      val streamPartitions: ArrayBuffer[Partition] =
    +        streamSplits.zipWithIndex.map { splitWithIndex =>
    +          val multiBlockSplit =
    +            new CarbonMultiBlockSplit(identifier,
    +              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
    +              splitWithIndex._1.getLocations)
    +          multiBlockSplit.setStream(true)
    --- End diff --
    
    I think you can set the same DATA_FILE_FORMAT enum in `multiBlockSplit`


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572][Streaming] Support streamin...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149617386
  
    --- Diff: pom.xml ---
    @@ -333,7 +333,7 @@
             <version>3.0.4</version>
             <configuration>
               <excludeFilterFile>${dev.path}/findbugs-exclude.xml</excludeFilterFile>
    -          <failOnError>true</failOnError>
    +          <failOnError>false</failOnError>
    --- End diff --
    
    I will recovery it


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/902/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572][Streaming] Support streamin...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/1470


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572] Support streaming ingest and query

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/853/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1522/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149078482
  
    --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegmentManager.java ---
    @@ -0,0 +1,302 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming.segment;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.locks.ICarbonLock;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.path.CarbonStorePath;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
    +import org.apache.carbondata.format.BlockIndex;
    +import org.apache.carbondata.format.BlockletIndex;
    +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat;
    +
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +
    +/**
    + * streaming segment manager
    + */
    +public class StreamSegmentManager {
    --- End diff --
    
    Can we unify it with `SegmentStatusManager`


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572][Streaming] Support streamin...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149600151
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java ---
    @@ -0,0 +1,347 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop.streaming;
    +
    +import java.io.DataOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +import org.apache.carbondata.core.util.path.CarbonStorePath;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +import org.apache.carbondata.format.FileHeader;
    +import org.apache.carbondata.processing.loading.BadRecordsLogger;
    +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
    +import org.apache.carbondata.processing.loading.DataField;
    +import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
    +import org.apache.carbondata.processing.loading.converter.RowConverter;
    +import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.processing.loading.parser.RowParser;
    +import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
    +import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
    +import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
    +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskID;
    +
    +/**
    + * Stream record writer
    + */
    +public class CarbonStreamRecordWriter extends RecordWriter {
    +
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName());
    +
    +  // basic info
    +  private Configuration hadoopConf;
    +  private CarbonDataLoadConfiguration configuration;
    +  private String segmentId;
    +  private int taskNo;
    +  private CarbonTable carbonTable;
    +  private int maxRowNums;
    +  private int maxCacheSize;
    +
    +  // parser and converter
    +  private RowParser rowParser;
    +  private RowConverter converter;
    +  private CarbonRow currentRow = new CarbonRow(null);
    +
    +  // encoder
    +  private DataField[] dataFields;
    +  private BitSet nullBitSet;
    +  private boolean[] isNoDictionaryDimensionColumn;
    +  private int dimensionWithComplexCount;
    +  private int measureCount;
    +  private int[] measureDataTypes;
    +  private StreamBlockletWriter output = null;
    +
    +  // data write
    +  private String segmentDir;
    +  private String fileName;
    +  private DataOutputStream outputStream;
    +  private boolean isFirstRow = true;
    +  private boolean hasException = false;
    +
    +  CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
    +    initialize(job);
    +  }
    +
    +  /**
    +   *
    +   */
    +  private void initialize(TaskAttemptContext job) throws IOException {
    +    // set basic information
    +    hadoopConf = job.getConfiguration();
    +    CarbonLoadModel carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
    +    if (carbonLoadModel == null) {
    +      throw new IOException(
    +          "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
    +    }
    +    segmentId = carbonLoadModel.getSegmentId();
    +    carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
    +    taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
    +    carbonLoadModel.setTaskNo("" + taskNo);
    +    configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
    +    maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,
    +        CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1;
    +    maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
    +        CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
    +    // try recover data file from fault for task at first
    +    tryRecoverFromFault();
    +  }
    +
    +  /**
    +   * try recover data file from fault for task
    +   */
    +  private void tryRecoverFromFault() throws IOException {
    +    CarbonTablePath tablePath =
    +        CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
    +    segmentDir = tablePath.getSegmentDir("0", segmentId);
    +    fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
    +    String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    +    CarbonStreamRecordWriter.recoverDataFile(segmentDir, fileName, indexName);
    +  }
    +
    +  public static void recoverDataFile(String segmentDir, String fileName, String indexName)
    --- End diff --
    
    Better to keep file recovery and segment recovery handling function in same class


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1502/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/887/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149071325
  
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.encoder.Encoding
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.DataLoadingUtil
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +/**
    + * Stream sink factory
    + */
    +object StreamSinkFactory {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(StreamSinkFactory.getClass.getCanonicalName)
    --- End diff --
    
    This is not used


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149065658
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    +      val carbonInputSplit = splits.get(i).asInstanceOf[CarbonInputSplit]
    +      if ("row-format".equals(carbonInputSplit.getFormat)) {
    --- End diff --
    
    It is better to use enum instead of string compare


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149077497
  
    --- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.internal.io.FileCommitProtocol
    +import org.apache.spark.sql.{DataFrame, SparkSession}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +class CarbonAppendableStreamSink(
    +    sparkSession: SparkSession,
    +    val carbonTable: CarbonTable,
    +    var currentSegmentId: String,
    +    parameters: Map[String, String],
    +    carbonLoadModel: CarbonLoadModel,
    +    sever: Option[DictionaryServer]) extends Sink {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +  private val carbonTablePath = CarbonStorePath
    +    .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
    +  private val fileLogPath = carbonTablePath.getStreamingLogDir
    +  private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
    +  // prepare configuration
    +  private val hadoopConf = {
    +    val conf = sparkSession.sessionState.newHadoopConf()
    +    CarbonStreamOutputFormat.setCarbonLoadModel(conf, carbonLoadModel)
    +    // put all parameters into hadoopConf
    +    parameters.foreach { entry =>
    +      conf.set(entry._1, entry._2)
    +    }
    +    conf
    +  }
    +
    +  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    +    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
    +      LOGGER.info(s"Skipping already committed batch $batchId")
    +    } else {
    +      checkOrHandOffSegment()
    +
    +      val committer = FileCommitProtocol.instantiate(
    +        className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
    +        jobId = batchId.toString,
    +        outputPath = fileLogPath,
    +        isAppend = false)
    +
    +      committer match {
    +        case manifestCommitter: ManifestFileCommitProtocol =>
    +          manifestCommitter.setupManifestOptions(fileLog, batchId)
    +        case _ => // Do nothing
    +      }
    +
    +      CarbonStreamProcessor.writeDataFileJob(
    --- End diff --
    
    I think it is not required to create `CarbonStreamProcessor`, why not move this function here


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/907/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149066956
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -82,8 +84,43 @@ class CarbonScanRDD(
     
         // get splits
         val splits = format.getSplits(job)
    -    val result = distributeSplits(splits)
    -    result
    +
    +    // separate split
    +    // 1. for batch splits, invoke distributeSplits method to create partitions
    +    // 2. for stream splits, create partition for each split by default
    +    val columnarSplits = new ArrayList[InputSplit]()
    +    val streamSplits = new ArrayBuffer[InputSplit]()
    +    for(i <- 0 until splits.size()) {
    +      val carbonInputSplit = splits.get(i).asInstanceOf[CarbonInputSplit]
    +      if ("row-format".equals(carbonInputSplit.getFormat)) {
    +        streamSplits += splits.get(i)
    +      } else {
    +        columnarSplits.add(splits.get(i))
    +      }
    +    }
    +    val batchPartitions = distributeSplits(columnarSplits)
    --- End diff --
    
    suggest rename to `distributeBatchSplits`


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1491/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149072545
  
    --- Diff: streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.dictionary.server.DictionaryServer
    +import org.apache.carbondata.core.metadata.encoder.Encoding
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonStorePath
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.spark.util.DataLoadingUtil
    +import org.apache.carbondata.streaming.segment.StreamSegmentManager
    +
    +/**
    + * Stream sink factory
    + */
    +object StreamSinkFactory {
    +
    +  private val LOGGER = LogServiceFactory.getLogService(StreamSinkFactory.getClass.getCanonicalName)
    +
    +  def createStreamTableSink(
    +      sparkSession: SparkSession,
    +      carbonTable: CarbonTable,
    +      parameters: Map[String, String]): Sink = {
    +      validateParameters(parameters)
    +
    +    // prepare the stream segment
    +    val segmentId = getStreamSegmentId(carbonTable)
    +    // build load model
    +    val carbonLoadModel = buildCarbonLoadModelForStream(
    +      sparkSession,
    +      carbonTable,
    +      parameters,
    +      segmentId)
    +    // start server if necessary
    +    val server = startDictionaryServer(
    --- End diff --
    
    Should add try-catch block to shutdown the server if anything failed before this function  returns


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1484/



---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572][Streaming] Support streamin...

Posted by chenliang613 <gi...@git.apache.org>.
Github user chenliang613 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149607861
  
    --- Diff: pom.xml ---
    @@ -333,7 +333,7 @@
             <version>3.0.4</version>
             <configuration>
               <excludeFilterFile>${dev.path}/findbugs-exclude.xml</excludeFilterFile>
    -          <failOnError>true</failOnError>
    +          <failOnError>false</failOnError>
    --- End diff --
    
    why changes this to 'false'?


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572] Support streaming ingest and query

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1462/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1518/



---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    LGTM


---

[GitHub] carbondata pull request #1470: [CARBONDATA-1572] Support streaming ingest an...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1470#discussion_r149070798
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -210,8 +247,18 @@ class CarbonScanRDD(
         inputMetricsStats.initBytesReadCallback(context, inputSplit)
         val iterator = if (inputSplit.getAllSplits.size() > 0) {
           val model = format.getQueryModel(inputSplit, attemptContext)
    -      val reader = {
    -        if (vectorReader) {
    +      val reader: RecordReader[Void, Object] = {
    +        if (inputSplit.isStream) {
    --- End diff --
    
    Add some description of this code block


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572][Streaming] Support streaming inges...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    retest this please


---

[GitHub] carbondata issue #1470: [CARBONDATA-1572] Support streaming ingest and query

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/1470
  
    Please give following description:
    ```
     - [X] Any interfaces changed?
     No
    
     - [X] Any backward compatibility impacted?
     No
    
     - [X] Document update required?
    No
    
     - [X] Testing done
    No new testcase is required
    
     - [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    NA
    ```


---