You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by ravipesala <gi...@git.apache.org> on 2018/08/01 01:46:16 UTC

[GitHub] carbondata pull request #2591: [WIP][CARBONDATA-2808] Insert into select is ...

Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2591#discussion_r206733033
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---
    @@ -457,13 +458,25 @@ class CarbonScanRDD[T: ClassTag](
             }
           }
     
    +      // TODO: rewrite this logic to call free memory in FailureListener on failures. On success,
    +      // no memory leak should be there, resources should be freed on success completion.
    +      // TODO: If CarbonRecordReader and VectorizedCarbonReader are called directly
    +      // from any other callers, then in the caller, resource clearing should be taken care
    +      val listeners = CarbonReflectionUtils.getField("onCompleteCallbacks", context)
    +        .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
    +      val isAdded = listeners.exists(p => p.isInstanceOf[InsertTaskCompletionListener])
           // add task completion before calling initialize as initialize method will internally call
           // for usage of unsafe method for processing of one blocklet and if there is any exception
           // while doing that the unsafe memory occupied for that task will not get cleared
    -      context.addTaskCompletionListener { _ =>
    -        closeReader.apply()
    -        close()
    -        logStatistics(executionId, taskId, queryStartTime, model.getStatisticsRecorder, split)
    +      context.addTaskCompletionListener { new QueryTaskCompletionListener(!isAdded,
    --- End diff --
    
    Remove the duplicated code which is copied to `QueryTaskCompletionListener` from this class


---