You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/08/07 13:09:57 UTC

[33/50] [abbrv] carbondata git commit: [CARBONDATA-2803]fix wrong datasize calculation and Refactoring for better readability and handle local dictionary for older tables

[CARBONDATA-2803]fix wrong datasize calculation and Refactoring for better readability and handle local dictionary for older tables

Changes in this PR:
1.data size was calculation wrongly, indexmap contains duplicate paths as it stores all blocklets, so remove duplicate and maintain uniq block paths for proper datasize calculation
2.Refactored code for better readability in carbonTableInputFormat
3. If the tableperoperties contain local dictionary enable property as null, it is old table, and put the flase value in the properties map

This closes #2583


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7e93d7b8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7e93d7b8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7e93d7b8

Branch: refs/heads/external-format
Commit: 7e93d7b8707c36bf3f8d1f153b67a8cb997fa0f4
Parents: bd6abbb
Author: akashrn5 <ak...@gmail.com>
Authored: Mon Jul 30 19:41:34 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Aug 2 17:00:05 2018 +0530

----------------------------------------------------------------------
 .../indexstore/blockletindex/BlockDataMap.java  |  2 +-
 .../core/metadata/SegmentFileStore.java         | 23 ++------
 .../core/metadata/schema/table/CarbonTable.java | 10 ++--
 .../hadoop/api/CarbonTableInputFormat.java      | 61 ++++++++++++--------
 .../FlatFolderTableLoadingTestCase.scala        | 31 ++++++++++
 5 files changed, 81 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index f4bb58e..0875e75 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -588,7 +588,7 @@ public class BlockDataMap extends CoarseGrainDataMap
 
   private boolean useMinMaxForExecutorPruning(FilterResolverIntf filterResolverIntf) {
     boolean useMinMaxForPruning = false;
-    if (this instanceof BlockletDataMap) {
+    if (!isLegacyStore && this instanceof BlockletDataMap) {
       useMinMaxForPruning = BlockletDataMapUtil
           .useMinMaxForBlockletPruning(filterResolverIntf, getMinMaxCacheColumns());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 111e444..1acf0ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -16,22 +16,9 @@
  */
 package org.apache.carbondata.core.metadata;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
+import java.io.*;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -511,11 +498,13 @@ public class SegmentFileStore {
     for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
       List<DataFileFooter> indexInfo =
           fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
-      List<String> blocks = new ArrayList<>();
+      // carbonindex file stores blocklets so block filename will be duplicated, use set to remove
+      // duplicates
+      Set<String> blocks = new LinkedHashSet<>();
       for (DataFileFooter footer : indexInfo) {
         blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
       }
-      indexFilesMap.put(entry.getKey(), blocks);
+      indexFilesMap.put(entry.getKey(), new ArrayList<>(blocks));
       boolean added = false;
       for (Map.Entry<String, List<String>> mergeFile : indexFileStore
           .getCarbonMergeFileToIndexFilesMap().entrySet()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 850a791..14052f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1160,10 +1160,11 @@ public class CarbonTable implements Serializable {
    * @param tableInfo
    */
   private static void setLocalDictInfo(CarbonTable table, TableInfo tableInfo) {
-    String isLocalDictionaryEnabled = tableInfo.getFactTable().getTableProperties()
-        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE);
-    String localDictionaryThreshold = tableInfo.getFactTable().getTableProperties()
-        .get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD);
+    Map<String, String> tableProperties = tableInfo.getFactTable().getTableProperties();
+    String isLocalDictionaryEnabled =
+        tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE);
+    String localDictionaryThreshold =
+        tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD);
     if (null != isLocalDictionaryEnabled) {
       table.setLocalDictionaryEnabled(Boolean.parseBoolean(isLocalDictionaryEnabled));
       if (null != localDictionaryThreshold) {
@@ -1176,6 +1177,7 @@ public class CarbonTable implements Serializable {
       // in case of old tables, local dictionary enable property will not be present in
       // tableProperties, so disable the local dictionary generation
       table.setLocalDictionaryEnabled(Boolean.parseBoolean("false"));
+      tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index f53a1d7..be72983 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -198,30 +198,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), false,
             readCommittedScope);
     // Clean the updated segments from memory if the update happens on segments
-    List<Segment> toBeCleanedSegments = new ArrayList<>();
-    for (Segment filteredSegment : filteredSegmentToAccess) {
-      boolean refreshNeeded =
-          DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
-              .isRefreshNeeded(filteredSegment,
-                  updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
-      if (refreshNeeded) {
-        toBeCleanedSegments.add(filteredSegment);
-      }
-    }
-    // Clean segments if refresh is needed
-    for (Segment segment : filteredSegmentToAccess) {
-      if (DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
-          .isRefreshNeeded(segment.getSegmentNo())) {
-        toBeCleanedSegments.add(segment);
-      }
-    }
-
-
-    if (toBeCleanedSegments.size() > 0) {
-      DataMapStoreManager.getInstance()
-          .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
-              toBeCleanedSegments);
-    }
+    refreshSegmentCacheIfRequired(job, carbonTable, updateStatusManager, filteredSegmentToAccess);
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
@@ -266,6 +243,42 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   }
 
   /**
+   * Method to check and refresh segment cache
+   *
+   * @param job
+   * @param carbonTable
+   * @param updateStatusManager
+   * @param filteredSegmentToAccess
+   * @throws IOException
+   */
+  public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTable,
+      SegmentUpdateStatusManager updateStatusManager, List<Segment> filteredSegmentToAccess)
+      throws IOException {
+    List<Segment> toBeCleanedSegments = new ArrayList<>();
+    for (Segment filteredSegment : filteredSegmentToAccess) {
+      boolean refreshNeeded =
+          DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
+              .isRefreshNeeded(filteredSegment,
+                  updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
+      if (refreshNeeded) {
+        toBeCleanedSegments.add(filteredSegment);
+      }
+    }
+    // Clean segments if refresh is needed
+    for (Segment segment : filteredSegmentToAccess) {
+      if (DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
+          .isRefreshNeeded(segment.getSegmentNo())) {
+        toBeCleanedSegments.add(segment);
+      }
+    }
+    if (toBeCleanedSegments.size() > 0) {
+      DataMapStoreManager.getInstance()
+          .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
+              toBeCleanedSegments);
+    }
+  }
+
+  /**
    * Below method will be used to get the filter segments when query is fired on pre Aggregate
    * and main table in case of streaming.
    * For Pre Aggregate rules it will set all the valid segments for both streaming and

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
index 68f8ca7..97bcb5f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -149,7 +149,38 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
     sql("clean files for table flatfolder_delete")
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length == 1)
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 0)
+    sql("drop table if exists flatfolder_delete")
+  }
 
+  test("merge index flat folder and delete delta issue with GLOBAL SORT") {
+    sql("drop table if exists flatfolder_delete")
+    sql(
+      """
+        | CREATE TABLE flatfolder_delete (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,empno int)
+        | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true', 'SORT_SCOPE'='GLOBAL_SORT' )
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'GLOBAL_SORT_PARTITIONS'='4')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'GLOBAL_SORT_PARTITIONS'='4')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'GLOBAL_SORT_PARTITIONS'='4')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'GLOBAL_SORT_PARTITIONS'='4')""")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "flatfolder_delete")
+    sql(s"""delete from flatfolder_delete where empname='anandh'""")
+    sql(s"""delete from flatfolder_delete where empname='arvind'""")
+    sql(s"""select * from flatfolder_delete""").show()
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+             .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 8)
+    sql("Alter table flatfolder_delete compact 'minor'")
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+             .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 8)
+    sql("clean files for table flatfolder_delete")
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+             .filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length == 1)
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+             .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 0)
+    sql("drop table if exists flatfolder_delete")
   }
 
   override def afterAll = {