You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/05 16:35:36 UTC

carbondata git commit: [CARBONDATA-1838] Refactor SortStepRowUtil to make it more readable

Repository: carbondata
Updated Branches:
  refs/heads/master 4e3151087 -> afe96a241


[CARBONDATA-1838] Refactor SortStepRowUtil to make it more readable

Refactor and optimize SortRowStepUtil to make it efficient and more readable.

Firstly we get all the indices for the 3 groups: dictionary columns, non dictionary dimension columns and measures;
Then for each group, just iterate the source row and copy data to each group without any if-else branch.

This closes #1594


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/afe96a24
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/afe96a24
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/afe96a24

Branch: refs/heads/master
Commit: afe96a241d9ac3c11471061da1c4e20dc08f59db
Parents: 4e31510
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Wed Nov 29 20:40:44 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 6 00:34:03 2017 +0800

----------------------------------------------------------------------
 .../load/DataLoadProcessorStepOnSpark.scala     |  4 +-
 .../loading/sort/SortStepRowUtil.java           | 91 +++++++++++++-------
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  5 +-
 3 files changed, 65 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe96a24/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 574fb8a..c28426d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -128,7 +128,7 @@ object DataLoadProcessorStepOnSpark {
     val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
     val conf = DataLoadProcessBuilder.createConfiguration(model)
     val sortParameters = SortParameters.createSortParameters(conf)
-
+    val sortStepRowUtil = new SortStepRowUtil(sortParameters)
     TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
       wrapException(e, model)
     }
@@ -138,7 +138,7 @@ object DataLoadProcessorStepOnSpark {
 
       override def next(): CarbonRow = {
         val row =
-          new CarbonRow(SortStepRowUtil.convertRow(rows.next().getData, sortParameters))
+          new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData))
         rowCounter.add(1)
         row
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe96a24/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
index 9665487..c4e4756 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
@@ -21,53 +21,82 @@ import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
 
 public class SortStepRowUtil {
-  public static Object[] convertRow(Object[] data, SortParameters parameters) {
-    int measureCount = parameters.getMeasureColCount();
-    int dimensionCount = parameters.getDimColCount();
-    int complexDimensionCount = parameters.getComplexDimColCount();
-    int noDictionaryCount = parameters.getNoDictionaryCount();
-    boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+  private int measureCount;
+  private int dimensionCount;
+  private int complexDimensionCount;
+  private int noDictionaryCount;
+  private int[] dictDimIdx;
+  private int[] nonDictIdx;
+  private int[] measureIdx;
 
-    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+  public SortStepRowUtil(SortParameters parameters) {
+    this.measureCount = parameters.getMeasureColCount();
+    this.dimensionCount = parameters.getDimColCount();
+    this.complexDimensionCount = parameters.getComplexDimColCount();
+    this.noDictionaryCount = parameters.getNoDictionaryCount();
+    boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
 
-    Object[] holder = new Object[3];
     int index = 0;
     int nonDicIndex = 0;
     int allCount = 0;
-    int[] dim = new int[dimensionCount];
-    byte[][] nonDicArray = new byte[noDictionaryCount + complexDimensionCount][];
-    Object[] measures = new Object[measureCount];
-    try {
-      // read dimension values
-      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
-        if (isNoDictionaryDimensionColumn[i]) {
-          nonDicArray[nonDicIndex++] = (byte[]) data[i];
-        } else {
-          dim[index++] = (int) data[allCount];
-        }
-        allCount++;
-      }
 
-      for (int i = 0; i < complexDimensionCount; i++) {
-        nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
-        allCount++;
+    // be careful that the default value is 0
+    this.dictDimIdx = new int[dimensionCount - noDictionaryCount];
+    this.nonDictIdx = new int[noDictionaryCount + complexDimensionCount];
+    this.measureIdx = new int[measureCount];
+
+    // indices for dict dim columns
+    for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+      if (isNoDictionaryDimensionColumn[i]) {
+        nonDictIdx[nonDicIndex++] = i;
+      } else {
+        dictDimIdx[index++] = allCount;
       }
+      allCount++;
+    }
 
-      index = 0;
+    // indices for non dict dim/complex columns
+    for (int i = 0; i < complexDimensionCount; i++) {
+      nonDictIdx[nonDicIndex++] = allCount;
+      allCount++;
+    }
 
-      // read measure values
-      for (int i = 0; i < measureCount; i++) {
-        measures[index++] = data[allCount];
-        allCount++;
+    // indices for measure columns
+    for (int i = 0; i < measureCount; i++) {
+      measureIdx[i] = allCount;
+      allCount++;
+    }
+  }
+
+  public Object[] convertRow(Object[] data) {
+    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+    Object[] holder = new Object[3];
+    try {
+
+      int[] dictDims = new int[dimensionCount - noDictionaryCount];
+      byte[][] nonDictArray = new byte[noDictionaryCount + complexDimensionCount][];
+      Object[] measures = new Object[measureCount];
+
+      // write dict dim data
+      for (int idx = 0; idx < dictDimIdx.length; idx++) {
+        dictDims[idx] = (int) data[dictDimIdx[idx]];
       }
 
-      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+      // write non dict dim data
+      for (int idx = 0; idx < nonDictIdx.length; idx++) {
+        nonDictArray[idx] = (byte[]) data[nonDictIdx[idx]];
+      }
+
+      // write measure data
+      for (int idx = 0; idx < measureIdx.length; idx++) {
+        measures[idx] = data[measureIdx[idx]];
+      }
+      NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);
 
       // increment number if record read
     } catch (Exception e) {
       throw new RuntimeException("Problem while converting row ", e);
     }
-
     //return out row
     return holder;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe96a24/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index eb38efe..ce118d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -55,7 +55,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;
 
   private SortParameters parameters;
-
+  private SortStepRowUtil sortStepRowUtil;
   /**
    * tempFileLocation
    */
@@ -68,6 +68,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
       String[] tempFileLocation) {
     this.parameters = parameters;
+    this.sortStepRowUtil = new SortStepRowUtil(parameters);
     this.tempFileLocation = tempFileLocation;
     this.tableName = parameters.getTableName();
   }
@@ -184,7 +185,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
    * @return sorted row
    */
   public Object[] next() {
-    return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters);
+    return sortStepRowUtil.convertRow(getSortedRecordFromFile());
   }
 
   /**