You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/01/17 08:38:34 UTC

[carbondata] branch master updated: [CARBONDATA-3646] [CARBONDATA-3647]: Fix query failure with Index Server

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a4c208  [CARBONDATA-3646] [CARBONDATA-3647]: Fix query failure with Index Server
7a4c208 is described below

commit 7a4c2085f6721ae0d08307c58e56a4954b9bf2ec
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Fri Dec 27 17:02:19 2019 +0530

    [CARBONDATA-3646] [CARBONDATA-3647]: Fix query failure with Index Server
    
    Problems:
    1. Select * query fails when using index server.
    2. Filter query failure with Index server when loaded with global_sort_partition = 100000,
    giving Null pointer exception
    
    Solution:
    1. Indexservertmp folder configuration has been changed. The Indexservertmp
    folder is now created outside the table, as it is not recommended to keep it inside the table.
    2. table.getAbsoluteTableIdentifier() was giving NULL pointer exception
    when table was NULL, this case has been handled.
    
    This closes #3537
---
 .../carbondata/core/datamap/DataMapFilter.java     |  5 +++-
 .../core/datamap/DataMapStoreManager.java          |  2 +-
 .../core/indexstore/ExtendedBlockletWrapper.java   |  6 ++--
 .../TableStatusReadCommittedScope.java             |  2 +-
 .../apache/carbondata/core/util/CarbonUtil.java    | 35 +++++++++++++++-------
 docs/index-server.md                               |  2 +-
 .../carbondata/indexserver/DataMapJobs.scala       |  9 ++----
 .../carbondata/indexserver/IndexServer.scala       |  2 ++
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |  3 +-
 .../command/cache/CarbonDropCacheCommand.scala     | 23 +++++++-------
 10 files changed, 54 insertions(+), 35 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
index 1ca9697..4d47565 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -221,9 +222,11 @@ public class DataMapFilter implements Serializable {
    */
   private FilterResolverIntf resolveFilter() {
     try {
+      AbsoluteTableIdentifier absoluteTableIdentifier =
+              table != null ? table.getAbsoluteTableIdentifier() : null;
       FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
       return filterExpressionProcessor
-          .getFilterResolver(expression, table.getAbsoluteTableIdentifier());
+          .getFilterResolver(expression, absoluteTableIdentifier);
     } catch (Exception e) {
       throw new RuntimeException("Error while resolving filter expression", e);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 50c7d6b..7c3ce5f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -718,7 +718,7 @@ public final class DataMapStoreManager {
         UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
         SegmentRefreshInfo segmentRefreshInfo;
         if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null) {
-          segmentRefreshInfo = new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0);
+          segmentRefreshInfo = new SegmentRefreshInfo(updateVO.getLatestUpdateTimestamp(), 0);
         } else {
           segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
         }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
index ad4f804..8b51834 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
@@ -78,7 +78,8 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
     // executor to driver, in case of any failure data will send through network
     if (bytes.length > serializeAllowedSize && isWriteToFile) {
       final String fileName = UUID.randomUUID().toString();
-      String folderPath = CarbonUtil.getIndexServerTempPath(tablePath, queryId);
+      String folderPath = CarbonUtil.getIndexServerTempPath()
+              + CarbonCommonConstants.FILE_SEPARATOR + queryId;
       try {
         final CarbonFile carbonFile = FileFactory.getCarbonFile(folderPath);
         boolean isFolderExists = true;
@@ -178,7 +179,8 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
       if (isWrittenToFile) {
         DataInputStream stream = null;
         try {
-          final String folderPath = CarbonUtil.getIndexServerTempPath(tablePath, queryId);
+          final String folderPath = CarbonUtil.getIndexServerTempPath()
+                  + CarbonCommonConstants.FILE_SEPARATOR + queryId;
           String fileName = new String(bytes, CarbonCommonConstants.DEFAULT_CHARSET);
           stream = FileFactory
               .getDataInputStream(folderPath + "/" + fileName);
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index cde9d8a..63a3c04 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -94,7 +94,7 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
       throws IOException {
     SegmentRefreshInfo segmentRefreshInfo;
     if (updateVo != null) {
-      segmentRefreshInfo = new SegmentRefreshInfo(updateVo.getCreatedOrUpdatedTimeStamp(), 0);
+      segmentRefreshInfo = new SegmentRefreshInfo(updateVo.getLatestUpdateTimestamp(), 0);
     } else {
       segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index babbafa..79daeaa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3320,29 +3320,42 @@ public final class CarbonUtil {
     return UUID.randomUUID().toString();
   }
 
-  public static String getIndexServerTempPath(String tablePath, String queryId) {
+  public static String getIndexServerTempPath() {
     String tempFolderPath = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_TEMP_PATH);
     if (null == tempFolderPath) {
       tempFolderPath =
-          tablePath + "/" + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME + "/" + queryId;
+          "/tmp/" + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME;
     } else {
       tempFolderPath =
-          tempFolderPath + "/" + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME + "/"
-              + queryId;
+          tempFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+              + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME;
     }
-    return tempFolderPath;
+    return CarbonUtil.checkAndAppendFileSystemURIScheme(tempFolderPath);
   }
 
-  public static CarbonFile createTempFolderForIndexServer(String tablePath, String queryId)
-      throws IOException {
-    final String path = getIndexServerTempPath(tablePath, queryId);
-    CarbonFile file = FileFactory.getCarbonFile(path);
+  public static CarbonFile createTempFolderForIndexServer(String queryId)
+          throws IOException {
+    final String path = getIndexServerTempPath();
+    if (queryId == null) {
+      if (!FileFactory.isFileExist(path)) {
+        // Create the new index server temp directory if it does not exist
+        LOGGER.info("Creating Index Server temp folder:" + path);
+        FileFactory
+                .createDirectoryAndSetPermission(path,
+                        new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+      }
+      return null;
+    }
+    CarbonFile file = FileFactory.getCarbonFile(path + CarbonCommonConstants.FILE_SEPARATOR
+            + queryId);
     if (!file.mkdirs()) {
-      LOGGER.info("Unable to create table directory for index server");
+      LOGGER.info("Unable to create table directory: " + path + CarbonCommonConstants.FILE_SEPARATOR
+              + queryId);
       return null;
     } else {
-      LOGGER.info("Created index server temp directory" + path);
+      LOGGER.info("Successfully Created directory: " + path + CarbonCommonConstants.FILE_SEPARATOR
+              + queryId);
       return file;
     }
   }
diff --git a/docs/index-server.md b/docs/index-server.md
index 7eba589..f80f67d 100644
--- a/docs/index-server.md
+++ b/docs/index-server.md
@@ -117,7 +117,7 @@ meaning that no matter how small the splits are they would be written to the fil
 be written to file.
 
 The user can set the location for these files by using 'carbon.indexserver.temp.path'. By default
-table path would be used to write the files.
+the files are written in the path /tmp/indexservertmp.
 
 ## Prepriming
 As each query is responsible for caching the pruned datamaps, thus a lot of execution time is wasted in reading the 
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 3474795..6d8a467 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -49,8 +49,7 @@ class DistributedDataMapJob extends AbstractDataMapJob {
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
     }
     val splitFolderPath = CarbonUtil
-      .createTempFolderForIndexServer(dataMapFormat.getCarbonTable.getTablePath,
-        dataMapFormat.getQueryId)
+      .createTempFolderForIndexServer(dataMapFormat.getQueryId)
     LOGGER
       .info("Temp folder path for Query ID: " + dataMapFormat.getQueryId + " is " + splitFolderPath)
     val (resonse, time) = logTime {
@@ -67,11 +66,9 @@ class DistributedDataMapJob extends AbstractDataMapJob {
           .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat
             .getQueryId, dataMapFormat.isCountStarJob)
       } finally {
-        val tmpPath = CarbonUtil
-          .getIndexServerTempPath(dataMapFormat.getCarbonTable.getTablePath,
-            dataMapFormat.getQueryId)
         if (null != splitFolderPath && !splitFolderPath.deleteFile()) {
-          LOGGER.error("Problem while deleting the temp directory:" + tmpPath)
+          LOGGER.error("Problem while deleting the temp directory:"
+            + splitFolderPath.getAbsolutePath)
         }
       }
     }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 2bd6c11..855cb87 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -241,6 +241,8 @@ object IndexServer extends ServerInterface {
       })
       CarbonProperties.getInstance().addProperty(CarbonCommonConstants
         .CARBON_ENABLE_INDEX_SERVER, "true")
+      CarbonProperties.getInstance().addNonSerializableProperty(CarbonCommonConstants
+        .IS_DRIVER_INSTANCE, "true")
       LOGGER.info(s"Index cache server running on ${ server.getPort } port")
     }
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 6afee71..6cbb73e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
-import org.apache.spark.sql.execution.command.cache._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.listeners.{AlterDataMaptableCompactionPostListener, DataMapAddColumnsPreListener, DataMapAlterTableDropPartitionMetaListener, DataMapAlterTableDropPartitionPreStatusListener, DataMapChangeDataTypeorRenameColumnPreListener, DataMapDeleteSegmentPreListener, DataMapDropColumnPreListener, DropCacheBloomEventListener, DropCacheDataMapEventListener, LoadMVTablePreListener, LoadPostDataMapListener, PrePrimingEventListener, ShowCacheDataMapEventListener, ShowCachePreM [...]
@@ -71,6 +70,8 @@ class CarbonEnv {
       properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
     }
     LOGGER.info(s"Initializing CarbonEnv, store location: $storePath")
+    // Creating the index server temp folder where splits for select query is written
+    CarbonUtil.createTempFolderForIndexServer(null);
 
     sparkSession.udf.register("getTupleId", () => "")
     // added for handling MV table creation. when user will fire create ddl for
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
index 7b8e10f..490e03b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
@@ -49,18 +49,19 @@ case class CarbonDropCacheCommand(tableIdentifier: TableIdentifier, internalCall
     OperationListenerBus.getInstance.fireEvent(dropCacheEvent, operationContext)
 
     val cache = CacheProvider.getInstance().getCarbonCache
+    // Clea cache from IndexServer
+    if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName,
+      carbonTable.getTableName)) {
+      LOGGER.info("Clearing cache from IndexServer")
+      DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME)
+    }
     if (cache != null) {
-      // Get all Index files for the specified table.
-      if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName,
-        carbonTable.getTableName)) {
-        DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME)
-      } else {
-        // Extract dictionary keys for the table and create cache keys from those
-        val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
-        // Remove elements from cache
-        cache.removeAll(dictKeys.asJava)
-        DataMapStoreManager.getInstance().clearDataMaps(carbonTable.getAbsoluteTableIdentifier)
-      }
+      LOGGER.info("Clearing cache from driver side")
+      // Create cache keys from the extracted dictionary keys for the table.
+      val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
+      // Remove elements from cache
+      cache.removeAll(dictKeys.asJava)
+      DataMapStoreManager.getInstance().clearDataMaps(carbonTable.getAbsoluteTableIdentifier)
     }
     LOGGER.info("Drop cache request served for table " + carbonTable.getTableUniqueName)
   }