You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/04/18 12:56:29 UTC

[1/2] incubator-carbondata git commit: add SORT_COLUMNS option in dataframe

Repository: incubator-carbondata
Updated Branches:
  refs/heads/12-dev 8843aecfe -> ebf70bca8


add SORT_COLUMNS option in dataframe


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/be5904f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/be5904f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/be5904f8

Branch: refs/heads/12-dev
Commit: be5904f8ee9bab3c226cedba89d309c23f0aefae
Parents: 8843aec
Author: jackylk <ja...@huawei.com>
Authored: Tue Apr 18 07:26:43 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Apr 18 07:26:43 2017 +0800

----------------------------------------------------------------------
 .../core/datastore/SegmentTaskIndexStore.java   |  17 ---
 .../core/mutate/CarbonUpdateUtil.java           | 122 +++++++++----------
 .../statusmanager/SegmentStatusManager.java     |  17 +--
 .../carbondata/examples/CompareTest.scala       |   2 +
 .../carbondata/hadoop/CarbonInputSplit.java     |  10 +-
 .../testsuite/dataload/TestLoadDataFrame.scala  |  28 +++++
 .../apache/carbondata/spark/CarbonOption.scala  |   2 +
 .../spark/sql/CarbonDataFrameWriter.scala       |  12 +-
 .../processing/merger/CarbonDataMergerUtil.java |  82 ++++++-------
 9 files changed, 148 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 862455e..334efb4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -40,7 +40,6 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;
 
 /**
@@ -140,22 +139,6 @@ public class SegmentTaskIndexStore
   }
 
   /**
-   * returns block timestamp value from the given task
-   * @param taskKey
-   * @param listOfUpdatedFactFiles
-   * @return
-   */
-  private String getTimeStampValueFromBlock(String taskKey, List<String> listOfUpdatedFactFiles) {
-    for (String blockName : listOfUpdatedFactFiles) {
-      if (taskKey.equals(CarbonTablePath.DataFileUtil.getTaskNo(blockName))) {
-        blockName = blockName.substring(blockName.lastIndexOf('-') + 1, blockName.lastIndexOf('.'));
-        return blockName;
-      }
-    }
-    return null;
-  }
-
-  /**
    * Below method will be used to load the segment of segments
    * One segment may have multiple task , so  table segment will be loaded
    * based on task id and will return the map of taksId to table segment

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index af26035..72c750f 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -54,7 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 public class CarbonUpdateUtil {
 
   private static final LogService LOGGER =
-          LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName());
+      LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName());
 
   /**
    * returns required filed from tuple id
@@ -86,11 +86,11 @@ public class CarbonUpdateUtil {
    */
   public static String getTableBlockPath(String tid, String factPath) {
     String part =
-            CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID));
+        CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID));
     String segment =
-            CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
+        CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
     return factPath + CarbonCommonConstants.FILE_SEPARATOR + part
-            + CarbonCommonConstants.FILE_SEPARATOR + segment;
+        + CarbonCommonConstants.FILE_SEPARATOR + segment;
 
   }
 
@@ -103,7 +103,7 @@ public class CarbonUpdateUtil {
    * @return
    */
   public static String getDeleteDeltaFilePath(String blockPath, String blockName,
-                                              String timestamp) {
+      String timestamp) {
     return blockPath + CarbonCommonConstants.FILE_SEPARATOR + blockName
         + CarbonCommonConstants.HYPHEN + timestamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
 
@@ -119,7 +119,7 @@ public class CarbonUpdateUtil {
       CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) {
     boolean status = false;
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-            new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+        new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
     ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
     boolean lockStatus = false;
 
@@ -130,12 +130,12 @@ public class CarbonUpdateUtil {
         AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
 
         CarbonTablePath carbonTablePath = CarbonStorePath
-                .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                        absoluteTableIdentifier.getCarbonTableIdentifier());
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                absoluteTableIdentifier.getCarbonTableIdentifier());
 
         // read the existing file if present and update the same.
         SegmentUpdateDetails[] oldDetails = segmentUpdateStatusManager
-                .getUpdateStatusDetails();
+            .getUpdateStatusDetails();
 
         List<SegmentUpdateDetails> oldList = new ArrayList(Arrays.asList(oldDetails));
 
@@ -187,9 +187,9 @@ public class CarbonUpdateUtil {
    * @return
    */
   public static boolean updateTableMetadataStatus(Set<String> updatedSegmentsList,
-                                                  CarbonTable table, String updatedTimeStamp,
-                                                  boolean isTimestampUpdationRequired,
-                                                  List<String> segmentsToBeDeleted) {
+      CarbonTable table, String updatedTimeStamp,
+      boolean isTimestampUpdationRequired,
+      List<String> segmentsToBeDeleted) {
 
     boolean status = false;
 
@@ -198,8 +198,8 @@ public class CarbonUpdateUtil {
     AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
 
     CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
 
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
 
@@ -211,11 +211,11 @@ public class CarbonUpdateUtil {
       lockStatus = carbonLock.lockWithRetries();
       if (lockStatus) {
         LOGGER.info(
-                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
-                        + " for table status updation");
+            "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+            segmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
 
@@ -223,7 +223,7 @@ public class CarbonUpdateUtil {
             // we are storing the link between the 2 status files in the segment 0 only.
             if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
               loadMetadata.setUpdateStatusFileName(
-                      CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
+                  CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
             }
 
             // if the segments is in the list of marked for delete then update the status.
@@ -252,7 +252,7 @@ public class CarbonUpdateUtil {
 
         try {
           segmentStatusManager
-                  .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+              .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
         } catch (IOException e) {
           return false;
         }
@@ -260,18 +260,18 @@ public class CarbonUpdateUtil {
         status = true;
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+            .getDatabaseName() + "." + table.getFactTableName());
       }
     } finally {
       if (lockStatus) {
         if (carbonLock.unlock()) {
           LOGGER.info(
-                 "Table unlocked successfully after table status updation" + table.getDatabaseName()
-                          + "." + table.getFactTableName());
+              "Table unlocked successfully after table status updation" + table.getDatabaseName()
+                  + "." + table.getFactTableName());
         } else {
           LOGGER.error(
-                  "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-                          .getFactTableName() + " during table status updation");
+              "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
+                  .getFactTableName() + " during table status updation");
         }
       }
     }
@@ -287,7 +287,7 @@ public class CarbonUpdateUtil {
    */
   public static String getUpdateStatusFileName(String updatedTimeStamp) {
     return CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME + CarbonCommonConstants.HYPHEN
-            + updatedTimeStamp;
+        + updatedTimeStamp;
   }
 
   /**
@@ -301,13 +301,13 @@ public class CarbonUpdateUtil {
     AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
 
     CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
     // as of now considering only partition 0.
     String partitionId = "0";
     String partitionDir = carbonTablePath.getPartitionDir(partitionId);
     CarbonFile file =
-            FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir));
+        FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir));
     if (!file.exists()) {
       return;
     }
@@ -317,8 +317,8 @@ public class CarbonUpdateUtil {
         @Override public boolean accept(CarbonFile file) {
           String fileName = file.getName();
           return (fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)
-                  || fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
-                  || fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
+              || fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+              || fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
         }
       });
       // deleting the files of a segment.
@@ -390,7 +390,7 @@ public class CarbonUpdateUtil {
 
     // scan all the carbondata files and get the latest task ID.
     CarbonFile segment =
-            FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
+        FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
     CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
 
@@ -420,7 +420,7 @@ public class CarbonUpdateUtil {
 
     // scan all the carbondata files and get the latest task ID.
     CarbonFile segment =
-            FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
+        FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
 
     CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
@@ -495,8 +495,8 @@ public class CarbonUpdateUtil {
     SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
 
     CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(table.getAbsoluteTableIdentifier().getStorePath(),
-                    table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
+        .getCarbonTablePath(table.getAbsoluteTableIdentifier().getStorePath(),
+            table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
 
     LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath());
 
@@ -513,23 +513,23 @@ public class CarbonUpdateUtil {
       // if the segment is mark for delete or compacted then any way it will get deleted.
 
       if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-              || segment.getLoadStatus()
-              .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
+          || segment.getLoadStatus()
+          .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
 
         // take the list of files from this segment.
         String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
         CarbonFile segDir =
-                FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+            FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
         CarbonFile[] allSegmentFiles = segDir.listFiles();
 
         // scan through the segment and find the carbondatafiles and index files.
         SegmentUpdateStatusManager updateStatusManager =
-                new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+            new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
 
         // get Invalid update  delta files.
         CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
-                .getUpdateDeltaFilesList(segment.getLoadName(), false,
-                        CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles);
+            .getUpdateDeltaFilesList(segment.getLoadName(), false,
+                CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles);
 
         // now for each invalid delta file need to check the query execution time out
         // and then delete.
@@ -541,8 +541,8 @@ public class CarbonUpdateUtil {
 
         // do the same for the index files.
         CarbonFile[] invalidIndexFiles = updateStatusManager
-                .getUpdateDeltaFilesList(segment.getLoadName(), false,
-                        CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles);
+            .getUpdateDeltaFilesList(segment.getLoadName(), false,
+                CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles);
 
         // now for each invalid index file need to check the query execution time out
         // and then delete.
@@ -571,16 +571,16 @@ public class CarbonUpdateUtil {
           // case 1
           if (CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
             completeListOfDeleteDeltaFiles = updateStatusManager
-                    .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
-                            allSegmentFiles);
+                .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
+                    allSegmentFiles);
             for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
 
               compareTimestampsAndDelete(invalidFile, forceDelete, false);
             }
 
             CarbonFile[] blockRelatedFiles = updateStatusManager
-                    .getAllBlockRelatedFiles(block.getBlockName(), allSegmentFiles,
-                            block.getActualBlockName());
+                .getAllBlockRelatedFiles(block.getBlockName(), allSegmentFiles,
+                    block.getActualBlockName());
 
             // now for each invalid index file need to check the query execution time out
             // and then delete.
@@ -593,8 +593,8 @@ public class CarbonUpdateUtil {
 
           } else {
             invalidDeleteDeltaFiles = updateStatusManager
-                    .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
-                            allSegmentFiles);
+                .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
+                    allSegmentFiles);
             for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
 
               compareTimestampsAndDelete(invalidFile, forceDelete, false);
@@ -608,10 +608,10 @@ public class CarbonUpdateUtil {
     if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
 
       final String updateStatusTimestamp = validUpdateStatusFile
-              .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
+          .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
 
       CarbonFile metaFolder = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath(),
-              FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath()));
+          FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath()));
 
       CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() {
         @Override public boolean accept(CarbonFile file) {
@@ -645,7 +645,7 @@ public class CarbonUpdateUtil {
     int maxTime;
     try {
       maxTime = Integer.parseInt(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+          .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
     } catch (NumberFormatException e) {
       maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
     }
@@ -665,15 +665,15 @@ public class CarbonUpdateUtil {
    * @param isUpdateStatusFile if true then the parsing of file name logic changes.
    */
   private static void compareTimestampsAndDelete(CarbonFile invalidFile,
-                                                 boolean forceDelete, boolean isUpdateStatusFile) {
+      boolean forceDelete, boolean isUpdateStatusFile) {
     long fileTimestamp = 0L;
 
     if (isUpdateStatusFile) {
       fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(invalidFile.getName()
-              .substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 1));
+          .substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 1));
     } else {
       fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(
-              CarbonTablePath.DataFileUtil.getTimeStampFromFileName(invalidFile.getName()));
+          CarbonTablePath.DataFileUtil.getTimeStampFromFileName(invalidFile.getName()));
     }
 
     // if the timestamp of the file is more than the current time by query execution timeout.
@@ -698,7 +698,7 @@ public class CarbonUpdateUtil {
    */
   public static boolean isBlockInvalid(String blockStatus) {
     if (blockStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED) || blockStatus
-            .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+        .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
       return true;
     }
     return false;
@@ -718,7 +718,7 @@ public class CarbonUpdateUtil {
    * @param segmentBlockCount
    */
   public static void decrementDeletedBlockCount(SegmentUpdateDetails details,
-                                                Map<String, Long> segmentBlockCount) {
+      Map<String, Long> segmentBlockCount) {
 
     String segId = details.getSegmentName();
 
@@ -751,12 +751,12 @@ public class CarbonUpdateUtil {
    * @param segmentUpdateStatusManager
    */
   public static void createBlockDetailsMap(BlockMappingVO blockMappingVO,
-                                           SegmentUpdateStatusManager segmentUpdateStatusManager) {
+      SegmentUpdateStatusManager segmentUpdateStatusManager) {
 
     Map<String, Long> blockRowCountMap = blockMappingVO.getBlockRowCountMapping();
 
     Map<String, RowCountDetailsVO> outputMap =
-            new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
     for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) {
       String key = blockRowEntry.getKey();
@@ -771,7 +771,7 @@ public class CarbonUpdateUtil {
       }
 
       RowCountDetailsVO rowCountDetailsVO =
-              new RowCountDetailsVO(blockRowEntry.getValue(), alreadyDeletedCount);
+          new RowCountDetailsVO(blockRowEntry.getValue(), alreadyDeletedCount);
       outputMap.put(key, rowCountDetailsVO);
 
     }
@@ -789,8 +789,8 @@ public class CarbonUpdateUtil {
   public static String getSegmentBlockNameKey(String segID, String blockName) {
 
     String blockNameWithOutPart = blockName
-            .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
-                    blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
+        .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
+            blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
 
     return segID + CarbonCommonConstants.FILE_SEPARATOR + blockNameWithOutPart;
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index c2c41e5..658ff8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -562,6 +562,7 @@ public class SegmentStatusManager {
     }
   }
 
+
   /**
    * This API will return the update status file name.
    * @param segmentList
@@ -582,22 +583,6 @@ public class SegmentStatusManager {
     return "";
   }
 
-  /**
-   * getting the task numbers present in the segment.
-   * @param segmentId
-   * @return
-   */
-  public List<String> getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager
-          updateStatusManager) {
-    List<String> taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId);
-    for (String eachFileName : list) {
-      taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName));
-    }
-    return taskList;
-  }
-
-
   public static class ValidAndInvalidSegmentsInfo {
     private final List<String> listOfValidSegments;
     private final List<String> listOfValidUpdatedSegments;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
index 82bd02a..f577fcf 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
@@ -266,6 +266,7 @@ object CompareTest {
           .option("tempCSV", "false")
           .option("single_pass", "true")
           .option("dictionary_exclude", "id") // id is high cardinality column
+          .option("sort_columns", "")
           .mode(SaveMode.Overwrite)
           .save()
     }
@@ -278,6 +279,7 @@ object CompareTest {
     val loadParquetTime = loadParquetTable(spark, df)
     val loadCarbonV3Time = loadCarbonTable(spark, df, version = "3")
     println(s"load completed, time: $loadParquetTime, $loadCarbonV3Time")
+    df.unpersist()
     spark.read.parquet(parquetTableName).registerTempTable(parquetTableName)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 0dcaba2..e89da75 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -231,14 +231,14 @@ public class CarbonInputSplit extends FileSplit
     }
 
     // Comparing the time task id of the file to other
-    // if both the task id of the file is same then we need to compare the
-    // offset of
-    // the file
+    // if both the task id of the file is same then we need to compare the offset of the file
     String filePath1 = this.getPath().getName();
     String filePath2 = other.getPath().getName();
     if (CarbonTablePath.isCarbonDataFile(filePath1)) {
-      int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
-      int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
+      int firstTaskId =
+          Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1).split("_")[0]);
+      int otherTaskId =
+          Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2).split("_")[0]);
       if (firstTaskId != otherTaskId) {
         return firstTaskId - otherTaskId;
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index b790131..d8c70b5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -111,6 +111,34 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
     )
   }
 
+  test("test load dataframe without sort") {
+    df.write
+        .format("carbondata")
+        .option("tableName", "carbon3")
+        .option("sort_columns", "")
+        .mode(SaveMode.Overwrite)
+        .save()
+    sql("select count(*) from carbon3 where c3 > 400").show
+    df.registerTempTable("temp")
+    sql("select count(*) from temp where c3 > 400").show
+    //sql("select * from carbon3 where c3 > 500").show
+    checkAnswer(
+      sql("select count(*) from carbon3 where c3 > 500"), Row(500)
+    )
+  }
+
+  test("test load dataframe using sort_columns") {
+    df.write
+        .format("carbondata")
+        .option("tableName", "carbon3")
+        .option("sort_columns", "c2, c3")
+        .mode(SaveMode.Overwrite)
+        .save()
+    checkAnswer(
+      sql("select count(*) from carbon3 where c3 > 500"), Row(500)
+    )
+  }
+
   test("test decimal values for dataframe load"){
     dataFrame.write
       .format("carbondata")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index c29c1a2..93c2d18 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -57,5 +57,7 @@ class CarbonOption(options: Map[String, String]) {
   def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
                                     options.contains("bucketnumber")
 
+  def sortColumns: Option[String] = options.get("sort_columns")
+
   def toMap: Map[String, String] = options
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 576da58..b1e9ada 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import scala.collection.mutable
+
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.sql.execution.command.LoadTable
@@ -33,8 +35,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
 
   def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
     // create a new table using dataframe's schema and write its content into the table
-    sqlContext.sparkSession.sql(
-      makeCreateTableString(dataFrame.schema, new CarbonOption(parameters)))
+    val sqlString = makeCreateTableString(dataFrame.schema, new CarbonOption(parameters))
+    sqlContext.sparkSession.sql(sqlString)
     writeToCarbonFile(parameters)
   }
 
@@ -84,7 +86,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
     LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
 
     try {
-      sqlContext.sql(makeLoadString(tempCSVFolder, options))
+      val sqlString = makeLoadString(tempCSVFolder, options)
+      sqlContext.sql(sqlString)
     } finally {
       fs.delete(tempCSVPath, true)
     }
@@ -164,7 +167,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
     }
     val property = Map(
       "DICTIONARY_INCLUDE" -> options.dictionaryInclude,
-      "DICTIONARY_EXCLUDE" -> options.dictionaryExclude
+      "DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
+      "SORT_COLUMNS" -> options.sortColumns
     ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
     s"""
        | CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 2b81eec..a6a9b88 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -853,12 +853,12 @@ public final class CarbonDataMergerUtil {
    * @return
    */
   public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
-          throws IOException {
+      throws IOException {
 
     SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null;
     try {
       validAndInvalidSegments =
-              new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
+          new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
     } catch (IOException e) {
       LOGGER.error("Error while getting valid segment list for a table identifier");
       throw new IOException();
@@ -913,9 +913,9 @@ public final class CarbonDataMergerUtil {
 
   private static boolean isSegmentValid(LoadMetadataDetails seg) {
     return seg.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-            || seg.getLoadStatus()
-            .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
-            .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
+        || seg.getLoadStatus()
+        .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
+        .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
   }
 
   /**
@@ -1200,16 +1200,16 @@ public final class CarbonDataMergerUtil {
     CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
     try {
       deleteDeltaBlockDetails =
-              dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
+          dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
     } catch (Exception e) {
       String blockFilePath = fullBlockFilePath
-              .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
+          .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
       LOGGER.error("Error while getting the delete delta blocks in path " + blockFilePath);
       throw new IOException();
     }
     CarbonDeleteDeltaWriterImpl carbonDeleteWriter =
-            new CarbonDeleteDeltaWriterImpl(fullBlockFilePath,
-                    FileFactory.getFileType(fullBlockFilePath));
+        new CarbonDeleteDeltaWriterImpl(fullBlockFilePath,
+            FileFactory.getFileType(fullBlockFilePath));
     try {
       carbonDeleteWriter.write(deleteDeltaBlockDetails);
     } catch (IOException e) {
@@ -1220,11 +1220,11 @@ public final class CarbonDataMergerUtil {
   }
 
   public static Boolean updateStatusFile(
-          List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table,
-          String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) {
+      List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table,
+      String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) {
 
     List<SegmentUpdateDetails> segmentUpdateDetails =
-            new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size());
+        new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size());
 
 
     // Check the list output.
@@ -1235,10 +1235,10 @@ public final class CarbonDataMergerUtil {
         tempSegmentUpdateDetails.setBlockName(carbonDataMergerUtilResult.getBlockName());
 
         for (SegmentUpdateDetails origDetails : segmentUpdateStatusManager
-                .getUpdateStatusDetails()) {
+            .getUpdateStatusDetails()) {
           if (origDetails.getBlockName().equalsIgnoreCase(carbonDataMergerUtilResult.getBlockName())
-                  && origDetails.getSegmentName()
-                  .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) {
+              && origDetails.getSegmentName()
+              .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) {
 
             tempSegmentUpdateDetails.setDeletedRowsInBlock(origDetails.getDeletedRowsInBlock());
             tempSegmentUpdateDetails.setStatus(origDetails.getStatus());
@@ -1247,9 +1247,9 @@ public final class CarbonDataMergerUtil {
         }
 
         tempSegmentUpdateDetails.setDeleteDeltaStartTimestamp(
-                carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp());
+            carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp());
         tempSegmentUpdateDetails
-              .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp());
+            .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp());
 
         segmentUpdateDetails.add(tempSegmentUpdateDetails);
       } else return false;
@@ -1262,8 +1262,8 @@ public final class CarbonDataMergerUtil {
     AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
 
     CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
 
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
 
@@ -1277,38 +1277,38 @@ public final class CarbonDataMergerUtil {
       lockStatus = carbonLock.lockWithRetries();
       if (lockStatus) {
         LOGGER.info(
-                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
-                        + " for table status updation");
+            "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+            segmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
           if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
             loadMetadata.setUpdateStatusFileName(
-                    CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
+                CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
           }
         }
         try {
           segmentStatusManager
-                  .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+              .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
         } catch (IOException e) {
           return false;
         }
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+            .getDatabaseName() + "." + table.getFactTableName());
       }
     } finally {
       if (lockStatus) {
         if (carbonLock.unlock()) {
           LOGGER.info(
-                 "Table unlocked successfully after table status updation" + table.getDatabaseName()
-                          + "." + table.getFactTableName());
+              "Table unlocked successfully after table status updation" + table.getDatabaseName()
+                  + "." + table.getFactTableName());
         } else {
           LOGGER.error(
-                  "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-                          .getFactTableName() + " during table status updation");
+              "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
+                  .getFactTableName() + " during table status updation");
         }
       }
     }
@@ -1326,7 +1326,7 @@ public final class CarbonDataMergerUtil {
 
     String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
     AbsoluteTableIdentifier absoluteTableIdentifier =
-            model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+        model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath);
     List<LoadMetadataDetails> originalList = Arrays.asList(details);
@@ -1340,24 +1340,24 @@ public final class CarbonDataMergerUtil {
 
 
     ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
-            model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
-            LockUsage.TABLE_STATUS_LOCK);
+        model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
+        LockUsage.TABLE_STATUS_LOCK);
 
     try {
       if (carbonTableStatusLock.lockWithRetries()) {
         LOGGER.info(
             "Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName()
-                        + " for table status updation ");
+                + " for table status updation ");
         CarbonTablePath carbonTablePath = CarbonStorePath
-                .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                        absoluteTableIdentifier.getCarbonTableIdentifier());
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                absoluteTableIdentifier.getCarbonTableIdentifier());
 
         segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(),
-                originalList.toArray(new LoadMetadataDetails[originalList.size()]));
+            originalList.toArray(new LoadMetadataDetails[originalList.size()]));
       } else {
         LOGGER.error(
-                "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
-                        .getTableName() + "for table status updation");
+            "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
+                .getTableName() + "for table status updation");
         throw new Exception("Failed to update the MajorCompactionStatus.");
       }
     } catch (IOException e) {
@@ -1366,11 +1366,11 @@ public final class CarbonDataMergerUtil {
     } finally {
       if (carbonTableStatusLock.unlock()) {
         LOGGER.info(
-                "Table unlocked successfully after table status updation" + model.getDatabaseName()
-                        + "." + model.getTableName());
+            "Table unlocked successfully after table status updation" + model.getDatabaseName()
+                + "." + model.getTableName());
       } else {
         LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model
-                .getTableName() + " during table status updation");
+            .getTableName() + " during table status updation");
       }
     }
 



[2/2] incubator-carbondata git commit: [CARBONDATA-882] Add SORT_COLUMNS support in dataframe writer This closes #737

Posted by ja...@apache.org.
[CARBONDATA-882] Add SORT_COLUMNS support in dataframe writer This closes #737


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ebf70bca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ebf70bca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ebf70bca

Branch: refs/heads/12-dev
Commit: ebf70bca834ab9145191fe43618cbb1ab6875941
Parents: 8843aec be5904f
Author: jackylk <ja...@huawei.com>
Authored: Tue Apr 18 20:56:01 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Apr 18 20:56:01 2017 +0800

----------------------------------------------------------------------
 .../core/datastore/SegmentTaskIndexStore.java   |  17 ---
 .../core/mutate/CarbonUpdateUtil.java           | 122 +++++++++----------
 .../statusmanager/SegmentStatusManager.java     |  17 +--
 .../carbondata/examples/CompareTest.scala       |   2 +
 .../carbondata/hadoop/CarbonInputSplit.java     |  10 +-
 .../testsuite/dataload/TestLoadDataFrame.scala  |  28 +++++
 .../apache/carbondata/spark/CarbonOption.scala  |   2 +
 .../spark/sql/CarbonDataFrameWriter.scala       |  12 +-
 .../processing/merger/CarbonDataMergerUtil.java |  82 ++++++-------
 9 files changed, 148 insertions(+), 144 deletions(-)
----------------------------------------------------------------------