You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/28 10:26:08 UTC

[carbondata] branch master updated: [CARBONDATA-3392] Make LRU mandatory for index server

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new df7339c  [CARBONDATA-3392] Make LRU mandatory for index server
df7339c is described below

commit df7339ce005be48dfb440e4cd02f640d6555e887
Author: kunal642 <ku...@gmail.com>
AuthorDate: Wed May 15 16:40:28 2019 +0530

    [CARBONDATA-3392] Make LRU mandatory for index server
    
    Background:
    Currently LRU is optional for the user to configure, but this will raise some concerns in case of index server because the invalid segments have to be constantly removed from the cache in case of update/delete/compaction scenarios.
    
    Therefore if clear segment job is failed then the job would not fail bu there has to be a mechanism to prevent that segment from being in cache forever.
    
    To prevent the above mentioned scenario LRU cache size for executor is a mandatory property for the index server application.
    
    This closes #3222
---
 .../carbondata/core/datamap/DataMapUtil.java       | 10 +++++-
 .../carbondata/core/util/BlockletDataMapUtil.java  |  2 +-
 .../hadoop/api/CarbonTableInputFormat.java         | 39 +++++++++++++---------
 .../carbondata/indexserver/DataMapJobs.scala       | 18 ----------
 .../indexserver/DistributedPruneRDD.scala          | 12 +++++--
 .../carbondata/indexserver/IndexServer.scala       | 19 +++++------
 .../spark/rdd/CarbonDataRDDFactory.scala           | 10 ++++--
 .../sql/execution/command/cache/CacheUtil.scala    | 15 +++++++--
 .../command/cache/CarbonShowCacheCommand.scala     | 23 ++++++++-----
 9 files changed, 86 insertions(+), 62 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index e20f19a..2371a10 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -115,7 +115,15 @@ public class DataMapUtil {
     DistributableDataMapFormat dataMapFormat = new DistributableDataMapFormat(carbonTable,
         validAndInvalidSegmentsInfo.getValidSegments(), invalidSegment, true,
         dataMapToClear);
-    dataMapJob.execute(dataMapFormat);
+    try {
+      dataMapJob.execute(dataMapFormat);
+    } catch (Exception e) {
+      if (dataMapJob.getClass().getName().equalsIgnoreCase(DISTRIBUTED_JOB_NAME)) {
+        LOGGER.warn("Failed to clear distributed cache.", e);
+      } else {
+        throw e;
+      }
+    }
   }
 
   public static void executeClearDataMapJob(CarbonTable carbonTable, String jobClassName)
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index c90c3dc..68aad72 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -228,7 +228,7 @@ public class BlockletDataMapUtil {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new ArrayList<>();
     String mergeFilePath =
         identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-            .getMergeIndexFileName();
+            .getIndexFileName();
     segmentIndexFileStore.readMergeFile(mergeFilePath);
     List<String> indexFiles =
         segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index dd86dcb..274c7ef 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -557,22 +557,31 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     }
     if (isIUDTable || isUpdateFlow) {
       Map<String, Long> blockletToRowCountMap = new HashMap<>();
-      if (CarbonProperties.getInstance().isDistributedPruningEnabled(table.getDatabaseName(),
-          table.getTableName())) {
-        List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
-            getDistributedSplit(table, null, partitions, filteredSegment,
-                allSegments.getInvalidSegments(), toBeCleanedSegments));
-        for (InputSplit extendedBlocklet : extendedBlocklets) {
-          CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
-          String filePath = blocklet.getFilePath();
-          String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
-          blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName,
-              (long) blocklet.getDetailInfo().getRowCount());
+      if (CarbonProperties.getInstance()
+          .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
+        try {
+          List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
+              getDistributedSplit(table, null, partitions, filteredSegment,
+                  allSegments.getInvalidSegments(), toBeCleanedSegments));
+          for (InputSplit extendedBlocklet : extendedBlocklets) {
+            CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
+            blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blocklet.getFilePath(),
+                (long) blocklet.getDetailInfo().getRowCount());
+          }
+        } catch (Exception e) {
+          // Check if fallback is disabled then directly throw exception otherwise try driver
+          // pruning.
+          if (CarbonProperties.getInstance().isFallBackDisabled()) {
+            throw e;
+          }
+          TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
+          blockletToRowCountMap
+              .putAll(defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap));
         }
       } else {
         TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
-        blockletToRowCountMap.putAll(
-            defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap));
+        blockletToRowCountMap
+            .putAll(defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap));
       }
       // key is the (segmentId","+blockletPath) and key is the row count of that blocklet
       for (Map.Entry<String, Long> eachBlocklet : blockletToRowCountMap.entrySet()) {
@@ -603,8 +612,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       }
     } else {
       long totalRowCount = 0L;
-      if (CarbonProperties.getInstance().isDistributedPruningEnabled(table.getDatabaseName(),
-          table.getTableName())) {
+      if (CarbonProperties.getInstance()
+          .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
         List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
             getDistributedSplit(table, null, partitions, filteredSegment,
                 allSegments.getInvalidSegments(), new ArrayList<String>()));
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index b03beca..57bdf34 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -60,21 +60,3 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
   }
 
 }
-
-class DistributedClearCacheJob extends AbstractDataMapJob {
-
-  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  override def execute(dataMapFormat: DistributableDataMapFormat): util.List[ExtendedBlocklet] = {
-    if (LOGGER.isDebugEnabled) {
-      val messageSize = SizeEstimator.estimate(dataMapFormat)
-      LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
-    }
-    val (response, time) = logTime {
-      IndexServer.getClient.invalidateCache(dataMapFormat)
-      new util.ArrayList[ExtendedBlocklet]()
-    }
-    LOGGER.info(s"Time taken to get response from server: $time ms")
-    response
-  }
-}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index fd59e2b..d2dab2d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.datamap.DistributableDataMapFormat
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.rdd.CarbonRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -60,7 +61,11 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
   }
 
   override protected def getPreferredLocations(split: Partition): Seq[String] = {
-    split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
+    if (split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations != null) {
+      split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
+    } else {
+      Seq()
+    }
   }
 
   override def internalCompute(split: Partition,
@@ -112,7 +117,10 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
   override protected def internalGetPartitions: Array[Partition] = {
     val job = Job.getInstance(FileFactory.getConfiguration)
     val splits = dataMapFormat.getSplits(job).asScala
-    if (dataMapFormat.isFallbackJob || splits.isEmpty) {
+    val isDistributedPruningEnabled = CarbonProperties.getInstance()
+      .isDistributedPruningEnabled(dataMapFormat.getCarbonTable.getDatabaseName,
+        dataMapFormat.getCarbonTable.getTableName)
+    if (!isDistributedPruningEnabled || dataMapFormat.isFallbackJob || splits.isEmpty) {
       splits.zipWithIndex.map {
         f => new DataMapRDDPartition(id, f._2, f._1)
       }.toArray
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 194f4cb..e738fb3 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -45,11 +45,6 @@ trait ServerInterface {
   def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet]
 
   /**
-   * Invalidate the cache for the provided table.
-   */
-  def invalidateCache(request: DistributableDataMapFormat): Unit
-
-  /**
    * Get the cache size for the specified table.
    */
   def showCache(tableName: String) : Array[String]
@@ -85,6 +80,10 @@ object IndexServer extends ServerInterface {
 
   private val numHandlers: Int = CarbonProperties.getInstance().getNumberOfHandlersForIndexServer
 
+  private val isExecutorLRUConfigured: Boolean =
+    CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE) != null
+
   /**
    * Getting sparkSession from ActiveSession because in case of embedded mode the session would
    * have already been created whereas in case of distributed mode the session would be
@@ -103,15 +102,10 @@ object IndexServer extends ServerInterface {
   def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet] = doAs {
     val splits = new DistributedPruneRDD(sparkSession, request).collect()
     DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
-    splits.map(_._2)
-  }
-
-  override def invalidateCache(request: DistributableDataMapFormat): Unit = doAs {
-    val splits = new DistributedPruneRDD(sparkSession, request).collect()
-    DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
     if (request.isJobToClearDataMaps) {
       DistributedRDDUtils.invalidateCache(request.getCarbonTable.getTableUniqueName)
     }
+    splits.map(_._2)
   }
 
   override def invalidateSegmentCache(databaseName: String, tableName: String,
@@ -131,6 +125,9 @@ object IndexServer extends ServerInterface {
       throw new RuntimeException(
         s"Please set ${ CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER }" +
         s" as true to use index server")
+    } else if (!isExecutorLRUConfigured) {
+      throw new RuntimeException(s"Executor LRU cache size is not set. Please set using " +
+                                 s"${ CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE }")
     } else {
       createCarbonSession()
       LOGGER.info("Starting Index Cache Server")
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 682e76c..f4422a8 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -257,8 +257,14 @@ object CarbonDataRDDFactory {
           // Remove compacted segments from executor cache.
           if (CarbonProperties.getInstance().isDistributedPruningEnabled(
               carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) {
-            IndexServer.getClient.invalidateSegmentCache(carbonLoadModel.getDatabaseName,
-              carbonLoadModel.getTableName, compactedSegments.asScala.toArray)
+            try {
+              IndexServer.getClient.invalidateSegmentCache(carbonLoadModel.getDatabaseName,
+                carbonLoadModel.getTableName, compactedSegments.asScala.toArray)
+            } catch {
+              case ex: Exception =>
+                LOGGER.warn(s"Clear cache job has failed for ${carbonLoadModel
+                  .getDatabaseName}.${carbonLoadModel.getTableName}", ex)
+            }
           }
           // giving the user his error for telling in the beeline if his triggered table
           // compaction is failed.
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
index 158bd1f..1707e78 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql.execution.command.cache
 
-import org.apache.hadoop.mapred.JobConf
 import scala.collection.JavaConverters._
 
+import org.apache.log4j.Logger
+
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheType
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -35,6 +37,8 @@ import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 
 object CacheUtil {
 
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   /**
    * Given a carbonTable, returns the list of all carbonindex files
    *
@@ -51,8 +55,13 @@ object CacheUtil {
         carbonTable.getTableName)) {
         val invalidSegmentIds = validAndInvalidSegmentsInfo.getInvalidSegments.asScala
           .map(_.getSegmentNo).toArray
-        IndexServer.getClient.invalidateSegmentCache(carbonTable.getDatabaseName, carbonTable
-          .getTableName, invalidSegmentIds)
+        try {
+          IndexServer.getClient.invalidateSegmentCache(carbonTable.getDatabaseName, carbonTable
+            .getTableName, invalidSegmentIds)
+        } catch {
+          case e: Exception =>
+            LOGGER.warn("Failed to clear cache from executors. ", e)
+        }
       }
       validAndInvalidSegmentsInfo.getValidSegments.asScala.flatMap {
         segment =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index 6f6ddea..1c3af69 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -218,16 +218,21 @@ case class CarbonShowCacheCommand(tableIdentifier: Option[TableIdentifier],
       case None => ""
     }
     val (result, time) = CarbonScalaUtil.logTime {
-      IndexServer.getClient.showCache(tableUniqueName).map(_.split(":"))
-        .groupBy(_.head).map { t =>
-        var sum = 0L
-        var length = 0
-        t._2.foreach {
-          arr =>
-            sum += arr(2).toLong
-            length += arr(1).toInt
+      try {
+        IndexServer.getClient.showCache(tableUniqueName).map(_.split(":"))
+          .groupBy(_.head).map { t =>
+          var sum = 0L
+          var length = 0
+          t._2.foreach {
+            arr =>
+              sum += arr(2).toLong
+              length += arr(1).toInt
+          }
+          (t._1, length, sum)
         }
-        (t._1, length, sum)
+      } catch {
+        case e: Exception =>
+          throw new RuntimeException("Failed to get Cache Information. ", e)
       }
     }
     LOGGER.info(s"Time taken to get cache results from Index Server is $time ms")