You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2019/06/25 04:50:20 UTC
[carbondata] branch master updated: [CARBONDATA-3412] Empty results
are displayed for non-transactional tables
This is an automated email from the ASF dual-hosted git repository.
xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 78cc014 [CARBONDATA-3412] Empty results are displayed for non-transactional tables
78cc014 is described below
commit 78cc0144ca888363f283a66610bdbea680723346
Author: kunal642 <ku...@gmail.com>
AuthorDate: Tue Jun 4 17:46:59 2019 +0530
[CARBONDATA-3412] Empty results are displayed for non-transactional tables
Solution:
Added a check for nonTransactional table when generating the read committed scope.
Empty results are displayed for non-transactional tables
Fixed TableNotFoundException for embedded mode
This closes #3255
---
.../core/datamap/DistributableDataMapFormat.java | 13 ++++++++++---
.../core/metadata/schema/table/CarbonTable.java | 1 +
.../apache/carbondata/indexserver/IndexServer.scala | 19 +++++++++++++------
.../indexserver/InvalidateSegmentCacheRDD.scala | 10 +++-------
.../carbondata/spark/rdd/CarbonDataRDDFactory.scala | 4 ++--
.../spark/sql/execution/command/cache/CacheUtil.scala | 3 +--
.../execution/command/mutation/DeleteExecution.scala | 4 ++--
7 files changed, 32 insertions(+), 22 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 0f82f57..cdc9e5c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -301,9 +302,15 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
private void initReadCommittedScope() throws IOException {
if (readCommittedScope == null) {
- this.readCommittedScope =
- new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
- FileFactory.getConfiguration());
+ if (table.isTransactionalTable()) {
+ this.readCommittedScope =
+ new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
+ FileFactory.getConfiguration());
+ } else {
+ this.readCommittedScope =
+ new LatestFilesReadCommittedScope(table.getTablePath(),
+ FileFactory.getConfiguration());
+ }
}
}
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 47ad582..a2f6b45 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -67,6 +67,7 @@ import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWra
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
/**
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 051b9de..c0646f6 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -18,6 +18,7 @@ package org.apache.carbondata.indexserver
import java.net.InetSocketAddress
import java.security.PrivilegedAction
+import java.util.UUID
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.ipc.{ProtocolInfo, RPC}
@@ -32,7 +33,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DistributableDataMapFormat
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapperContainer}
+import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapperContainer
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
@ProtocolInfo(protocolName = "Server", protocolVersion = 1)
@@ -52,8 +54,7 @@ trait ServerInterface {
/**
* Invalidate the cache for the specified segments only. Used in case of compaction/Update/Delete.
*/
- def invalidateSegmentCache(databaseName: String,
- tableName: String,
+ def invalidateSegmentCache(carbonTable: CarbonTable,
segmentIds: Array[String]): Unit
}
@@ -112,16 +113,22 @@ object IndexServer extends ServerInterface {
new ExtendedBlockletWrapperContainer(splits.map(_._2), request.isFallbackJob)
}
- override def invalidateSegmentCache(databaseName: String, tableName: String,
+ override def invalidateSegmentCache(carbonTable: CarbonTable,
segmentIds: Array[String]): Unit = doAs {
+ val databaseName = carbonTable.getDatabaseName
+ val tableName = carbonTable.getTableName
val jobgroup: String = " Invalided Segment Cache for " + databaseName + "." + tableName
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
- new InvalidateSegmentCacheRDD(sparkSession, databaseName, tableName, segmentIds.toList)
+ new InvalidateSegmentCacheRDD(sparkSession, carbonTable, segmentIds.toList)
.collect()
}
override def showCache(tableName: String = ""): Array[String] = doAs {
- val jobgroup: String = "Show Cache for " + tableName
+ val jobgroup: String = "Show Cache for " + (tableName match {
+ case "" => "for all tables"
+ case table => s"for $table"
+ })
+ sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", UUID.randomUUID().toString)
sparkSession.sparkContext.setLocalProperty("spark.job.description", jobgroup)
new DistributedShowCacheRDD(sparkSession, tableName).collect()
}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
index 0c2f877..c2bd589 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
@@ -19,8 +19,7 @@ package org.apache.carbondata.indexserver
import scala.collection.JavaConverters._
import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -28,11 +27,8 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.spark.rdd.CarbonRDD
-class InvalidateSegmentCacheRDD(@transient private val ss: SparkSession, databaseName: String,
- tableName: String, invalidSegmentIds: List[String]) extends CarbonRDD[String](ss, Nil) {
-
- val carbonTable: CarbonTable = CarbonEnv
- .getCarbonTable(TableIdentifier(tableName, Some(databaseName)))(ss)
+class InvalidateSegmentCacheRDD(@transient private val ss: SparkSession, carbonTable: CarbonTable,
+ invalidSegmentIds: List[String]) extends CarbonRDD[String](ss, Nil) {
val executorsList: Array[String] = DistributionUtil.getNodeList(ss.sparkContext)
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f4422a8..447d05b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -258,8 +258,8 @@ object CarbonDataRDDFactory {
if (CarbonProperties.getInstance().isDistributedPruningEnabled(
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) {
try {
- IndexServer.getClient.invalidateSegmentCache(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, compactedSegments.asScala.toArray)
+ IndexServer.getClient.invalidateSegmentCache(carbonLoadModel
+ .getCarbonDataLoadSchema.getCarbonTable, compactedSegments.asScala.toArray)
} catch {
case ex: Exception =>
LOGGER.warn(s"Clear cache job has failed for ${carbonLoadModel
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
index 1707e78..c257699 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
@@ -56,8 +56,7 @@ object CacheUtil {
val invalidSegmentIds = validAndInvalidSegmentsInfo.getInvalidSegments.asScala
.map(_.getSegmentNo).toArray
try {
- IndexServer.getClient.invalidateSegmentCache(carbonTable.getDatabaseName, carbonTable
- .getTableName, invalidSegmentIds)
+ IndexServer.getClient.invalidateSegmentCache(carbonTable, invalidSegmentIds)
} catch {
case e: Exception =>
LOGGER.warn("Failed to clear cache from executors. ", e)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index e14f465..3de8dd4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -321,8 +321,8 @@ object DeleteExecution {
if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable
.getDatabaseName, carbonTable.getTableName)) {
try {
- IndexServer.getClient.invalidateSegmentCache(carbonTable
- .getDatabaseName, carbonTable.getTableName, segmentsToBeCleared.map(_.getSegmentNo)
+ IndexServer.getClient
+ .invalidateSegmentCache(carbonTable, segmentsToBeCleared.map(_.getSegmentNo)
.toArray)
} catch {
case _: Exception =>