You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/01 13:41:08 UTC

carbondata git commit: Problem: Insert into select is failing as both are running as single task, both are sharing the same taskcontext and resources are cleared once if any one of the RDD(Select query's ScanRDD) is completed, so the other RDD(LoadRDD) r

Repository: carbondata
Updated Branches:
  refs/heads/master 3816e90e7 -> de9246066


Problem:
Insert into select is failing as both are running as single task, both are sharing the same taskcontext and resources are cleared once if any one of the RDD(Select query's ScanRDD) is completed, so the other RDD(LoadRDD) running is crashing as it is trying to access the cleared memory.

Solution:
Check if any other RDD is sharing the same task context. If so, don't the clear the resource at that time, the other RDD which shared the context should clear the memory once after the task is finished.

This closes #2591


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de924606
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de924606
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de924606

Branch: refs/heads/master
Commit: de92460665bafb403a4b90b513a9136b6f1fb34c
Parents: 3816e90
Author: dhatchayani <dh...@gmail.com>
Authored: Tue Jul 31 23:11:49 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Aug 1 19:10:56 2018 +0530

----------------------------------------------------------------------
 .../executor/impl/AbstractQueryExecutor.java    | 11 ++-
 .../carbondata/core/scan/model/QueryModel.java  | 11 +++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 64 +++++--------
 .../rdd/InsertTaskCompletionListener.scala      | 33 +++++++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  3 +-
 .../spark/rdd/QueryTaskCompletionListener.scala | 94 ++++++++++++++++++++
 6 files changed, 168 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 5b67921..f87e46e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -87,6 +87,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
    */
   protected QueryExecutorProperties queryProperties;
 
+  // whether to clear/free unsafe memory or not
+  private boolean freeUnsafeMemory;
+
   /**
    * query result iterator which will execute the query
    * and give the result
@@ -114,6 +117,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         queryModel.getQueryId());
     LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
         .getCarbonTableIdentifier().getTableName());
+    this.freeUnsafeMemory = queryModel.isFreeUnsafeMemory();
     // Initializing statistics list to record the query statistics
     // creating copy on write to handle concurrent scenario
     queryProperties.queryStatisticsRecorder = queryModel.getStatisticsRecorder();
@@ -641,8 +645,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         exceptionOccurred = e;
       }
     }
-    // clear all the unsafe memory used for the given task ID
-    UnsafeMemoryManager.INSTANCE.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
+    // clear all the unsafe memory used for the given task ID only if it is neccessary to be cleared
+    if (freeUnsafeMemory) {
+      UnsafeMemoryManager.INSTANCE
+          .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
+    }
     if (null != queryProperties.executorService) {
       // In case of limit query when number of limit records is already found so executors
       // must stop all the running execution otherwise it will keep running and will hit

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 55dafb9..31c7a86 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -114,6 +114,9 @@ public class QueryModel {
    */
   private boolean isFG;
 
+  // whether to clear/free unsafe memory or not
+  private boolean freeUnsafeMemory = true;
+
   private QueryModel(CarbonTable carbonTable) {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
     invalidSegmentIds = new ArrayList<>();
@@ -390,4 +393,12 @@ public class QueryModel {
         projection.getDimensions().size() + projection.getMeasures().size(),
         filterExpressionResolverTree.getFilterExpression().toString());
   }
+
+  public boolean isFreeUnsafeMemory() {
+    return freeUnsafeMemory;
+  }
+
+  public void setFreeUnsafeMemory(boolean freeUnsafeMemory) {
+    this.freeUnsafeMemory = freeUnsafeMemory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 67ea332..6b43999 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd}
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
+import org.apache.spark.util.{CarbonReflectionUtils, TaskCompletionListener}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
@@ -457,17 +458,29 @@ class CarbonScanRDD[T: ClassTag](
         }
       }
 
+      // create a statistics recorder
+      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
+      model.setStatisticsRecorder(recorder)
+
+      // TODO: rewrite this logic to call free memory in FailureListener on failures. On success,
+      // no memory leak should be there, resources should be freed on success completion.
+      val listeners = CarbonReflectionUtils.getField("onCompleteCallbacks", context)
+        .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
+      val isAdded = listeners.exists(p => p.isInstanceOf[InsertTaskCompletionListener])
+      model.setFreeUnsafeMemory(!isAdded)
       // add task completion before calling initialize as initialize method will internally call
       // for usage of unsafe method for processing of one blocklet and if there is any exception
       // while doing that the unsafe memory occupied for that task will not get cleared
-      context.addTaskCompletionListener { _ =>
-        closeReader.apply()
-        close()
-        logStatistics(executionId, taskId, queryStartTime, model.getStatisticsRecorder, split)
+      context.addTaskCompletionListener { new QueryTaskCompletionListener(!isAdded,
+        reader,
+        inputMetricsStats,
+        executionId,
+        taskId,
+        queryStartTime,
+        model.getStatisticsRecorder,
+        split,
+        queryId)
       }
-      // create a statistics recorder
-      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
-      model.setStatisticsRecorder(recorder)
       // initialize the reader
       reader.initialize(inputSplit, attemptContext)
 
@@ -625,43 +638,6 @@ class CarbonScanRDD[T: ClassTag](
     format
   }
 
-  def logStatistics(
-      executionId: String,
-      taskId: Long,
-      queryStartTime: Long,
-      recorder: QueryStatisticsRecorder,
-      split: Partition
-  ): Unit = {
-    if (null != recorder) {
-      val queryStatistic = new QueryStatistic()
-      queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
-        System.currentTimeMillis - queryStartTime)
-      recorder.recordStatistics(queryStatistic)
-      // print executor query statistics for each task_id
-      val statistics = recorder.statisticsForTask(taskId, queryStartTime)
-      if (statistics != null && executionId != null) {
-        Profiler.invokeIfEnable {
-          val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
-          inputSplit.calculateLength()
-          val size = inputSplit.getLength
-          val files = inputSplit.getAllSplits.asScala.map { s =>
-            s.getSegmentId + "/" + s.getPath.getName
-          }.toArray[String]
-          Profiler.send(
-            QueryTaskEnd(
-              executionId.toLong,
-              queryId,
-              statistics.getValues,
-              size,
-              files
-            )
-          )
-        }
-      }
-      recorder.logStatisticsForTask(statistics)
-    }
-  }
-
   /**
    * This method will check and remove InExpression from filterExpression to prevent the List
    * Expression values from serializing and deserializing on executor

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
new file mode 100644
index 0000000..9439ae5
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.spark.TaskContext
+import org.apache.spark.util.TaskCompletionListener
+
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo
+import org.apache.carbondata.processing.loading.DataLoadExecutor
+import org.apache.carbondata.spark.util.CommonUtil
+
+class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor)
+  extends TaskCompletionListener {
+  override def onTaskCompletion(context: TaskContext): Unit = {
+    dataLoadExecutor.close()
+    CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 6b136bc..3848bad 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -370,8 +370,7 @@ class NewDataFrameLoaderRDD[K, V](
         loader.initialize()
         val executor = new DataLoadExecutor
         // in case of success, failure or cancelation clear memory and stop execution
-        context.addTaskCompletionListener { context => executor.close()
-          CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+        context.addTaskCompletionListener (new InsertTaskCompletionListener(executor))
         executor.execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: NoRetryException =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
new file mode 100644
index 0000000..e4cb3f8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.profiler.{Profiler, QueryTaskEnd}
+import org.apache.spark.util.TaskCompletionListener
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.memory.UnsafeMemoryManager
+import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
+import org.apache.carbondata.core.util.{TaskMetricsMap, ThreadLocalTaskInfo}
+import org.apache.carbondata.spark.InitInputMetrics
+
+class QueryTaskCompletionListener(freeMemory: Boolean,
+    var reader: RecordReader[Void, Object],
+    inputMetricsStats: InitInputMetrics, executionId: String, taskId: Int, queryStartTime: Long,
+    queryStatisticsRecorder: QueryStatisticsRecorder, split: Partition, queryId: String)
+  extends TaskCompletionListener {
+  override def onTaskCompletion(context: TaskContext): Unit = {
+    if (reader != null) {
+      try {
+        reader.close()
+      } catch {
+        case e: Exception =>
+          LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(e)
+      }
+      reader = null
+    }
+    TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
+    inputMetricsStats.updateAndClose()
+    logStatistics(executionId, taskId, queryStartTime, queryStatisticsRecorder, split)
+    if (freeMemory) {
+      UnsafeMemoryManager.INSTANCE
+        .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+    }
+  }
+
+  def logStatistics(
+      executionId: String,
+      taskId: Long,
+      queryStartTime: Long,
+      recorder: QueryStatisticsRecorder,
+      split: Partition
+  ): Unit = {
+    if (null != recorder) {
+      val queryStatistic = new QueryStatistic()
+      queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+        System.currentTimeMillis - queryStartTime)
+      recorder.recordStatistics(queryStatistic)
+      // print executor query statistics for each task_id
+      val statistics = recorder.statisticsForTask(taskId, queryStartTime)
+      if (statistics != null && executionId != null) {
+        Profiler.invokeIfEnable {
+          val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+          inputSplit.calculateLength()
+          val size = inputSplit.getLength
+          val files = inputSplit.getAllSplits.asScala.map { s =>
+            s.getSegmentId + "/" + s.getPath.getName
+          }.toArray[String]
+          Profiler.send(
+            QueryTaskEnd(
+              executionId.toLong,
+              queryId,
+              statistics.getValues,
+              size,
+              files
+            )
+          )
+        }
+      }
+      recorder.logStatisticsForTask(statistics)
+    }
+  }
+}
+