You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2019/06/25 04:50:20 UTC

[carbondata] branch master updated: [CARBONDATA-3412] Empty results are displayed for non-transactional tables

This is an automated email from the ASF dual-hosted git repository.

xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 78cc014  [CARBONDATA-3412] Empty results are displayed for non-transactional tables
78cc014 is described below

commit 78cc0144ca888363f283a66610bdbea680723346
Author: kunal642 <ku...@gmail.com>
AuthorDate: Tue Jun 4 17:46:59 2019 +0530

    [CARBONDATA-3412] Empty results are displayed for non-transactional tables
    
    Solution:
    Added a check for nonTransactional table when generating the read committed scope.
    
    Empty results are displayed for non-transactional tables
    
    Fixed TableNotFoundException for embedded mode
    
    This closes #3255
---
 .../core/datamap/DistributableDataMapFormat.java      | 13 ++++++++++---
 .../core/metadata/schema/table/CarbonTable.java       |  1 +
 .../apache/carbondata/indexserver/IndexServer.scala   | 19 +++++++++++++------
 .../indexserver/InvalidateSegmentCacheRDD.scala       | 10 +++-------
 .../carbondata/spark/rdd/CarbonDataRDDFactory.scala   |  4 ++--
 .../spark/sql/execution/command/cache/CacheUtil.scala |  3 +--
 .../execution/command/mutation/DeleteExecution.scala  |  4 ++--
 7 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 0f82f57..cdc9e5c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -301,9 +302,15 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private void initReadCommittedScope() throws IOException {
     if (readCommittedScope == null) {
-      this.readCommittedScope =
-          new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
-              FileFactory.getConfiguration());
+      if (table.isTransactionalTable()) {
+        this.readCommittedScope =
+            new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
+                FileFactory.getConfiguration());
+      } else {
+        this.readCommittedScope =
+            new LatestFilesReadCommittedScope(table.getTablePath(),
+                FileFactory.getConfiguration());
+      }
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 47ad582..a2f6b45 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -67,6 +67,7 @@ import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWra
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
 import org.apache.log4j.Logger;
 
 /**
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 051b9de..c0646f6 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -18,6 +18,7 @@ package org.apache.carbondata.indexserver
 
 import java.net.InetSocketAddress
 import java.security.PrivilegedAction
+import java.util.UUID
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.ipc.{ProtocolInfo, RPC}
@@ -32,7 +33,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DistributableDataMapFormat
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapperContainer}
+import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapperContainer
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 
 @ProtocolInfo(protocolName = "Server", protocolVersion = 1)
@@ -52,8 +54,7 @@ trait ServerInterface {
   /**
    * Invalidate the cache for the specified segments only. Used in case of compaction/Update/Delete.
    */
-  def invalidateSegmentCache(databaseName: String,
-      tableName: String,
+  def invalidateSegmentCache(carbonTable: CarbonTable,
       segmentIds: Array[String]): Unit
 }
 
@@ -112,16 +113,22 @@ object IndexServer extends ServerInterface {
     new ExtendedBlockletWrapperContainer(splits.map(_._2), request.isFallbackJob)
   }
 
-  override def invalidateSegmentCache(databaseName: String, tableName: String,
+  override def invalidateSegmentCache(carbonTable: CarbonTable,
       segmentIds: Array[String]): Unit = doAs {
+    val databaseName = carbonTable.getDatabaseName
+    val tableName = carbonTable.getTableName
     val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName
     sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
-    new InvalidateSegmentCacheRDD(sparkSession, databaseName, tableName, segmentIds.toList)
+    new InvalidateSegmentCacheRDD(sparkSession, carbonTable, segmentIds.toList)
       .collect()
   }
 
   override def showCache(tableName: String = ""): Array[String] = doAs {
-    val jobgroup: String = "Show Cache for " + tableName
+    val jobgroup: String = "Show Cache for " + (tableName match {
+      case "" => "for all tables"
+      case table => s"for $table"
+    })
+    sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", UUID.randomUUID().toString)
     sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
     new DistributedShowCacheRDD(sparkSession, tableName).collect()
   }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
index 0c2f877..c2bd589 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
@@ -19,8 +19,7 @@ package org.apache.carbondata.indexserver
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -28,11 +27,8 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.spark.rdd.CarbonRDD
 
-class InvalidateSegmentCacheRDD(@transient private val ss: SparkSession, databaseName: String,
-    tableName: String, invalidSegmentIds: List[String]) extends CarbonRDD[String](ss, Nil) {
-
-  val carbonTable: CarbonTable = CarbonEnv
-    .getCarbonTable(TableIdentifier(tableName, Some(databaseName)))(ss)
+class InvalidateSegmentCacheRDD(@transient private val ss: SparkSession, carbonTable: CarbonTable,
+    invalidSegmentIds: List[String]) extends CarbonRDD[String](ss, Nil) {
 
   val executorsList: Array[String] = DistributionUtil.getNodeList(ss.sparkContext)
 
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f4422a8..447d05b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -258,8 +258,8 @@ object CarbonDataRDDFactory {
           if (CarbonProperties.getInstance().isDistributedPruningEnabled(
               carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) {
             try {
-              IndexServer.getClient.invalidateSegmentCache(carbonLoadModel.getDatabaseName,
-                carbonLoadModel.getTableName, compactedSegments.asScala.toArray)
+              IndexServer.getClient.invalidateSegmentCache(carbonLoadModel
+                .getCarbonDataLoadSchema.getCarbonTable, compactedSegments.asScala.toArray)
             } catch {
               case ex: Exception =>
                 LOGGER.warn(s"Clear cache job has failed for ${carbonLoadModel
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
index 1707e78..c257699 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
@@ -56,8 +56,7 @@ object CacheUtil {
         val invalidSegmentIds = validAndInvalidSegmentsInfo.getInvalidSegments.asScala
           .map(_.getSegmentNo).toArray
         try {
-          IndexServer.getClient.invalidateSegmentCache(carbonTable.getDatabaseName, carbonTable
-            .getTableName, invalidSegmentIds)
+          IndexServer.getClient.invalidateSegmentCache(carbonTable, invalidSegmentIds)
         } catch {
           case e: Exception =>
             LOGGER.warn("Failed to clear cache from executors. ", e)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index e14f465..3de8dd4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -321,8 +321,8 @@ object DeleteExecution {
     if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable
       .getDatabaseName, carbonTable.getTableName)) {
       try {
-        IndexServer.getClient.invalidateSegmentCache(carbonTable
-          .getDatabaseName, carbonTable.getTableName, segmentsToBeCleared.map(_.getSegmentNo)
+        IndexServer.getClient
+          .invalidateSegmentCache(carbonTable, segmentsToBeCleared.map(_.getSegmentNo)
           .toArray)
       } catch {
         case _: Exception =>