You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/05 16:35:36 UTC
carbondata git commit: [CARBONDATA-1838] Refactor SortStepRowUtil to
make it more readable
Repository: carbondata
Updated Branches:
refs/heads/master 4e3151087 -> afe96a241
[CARBONDATA-1838] Refactor SortStepRowUtil to make it more readable
Refactor and optimize SortRowStepUtil to make it efficient and more readable.
Firstly we get all the indices for the 3 groups: dictionary columns, non dictionary dimension columns and measures;
Then for each group, just iterate the source row and copy data to each group without any if-else branch.
This closes #1594
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/afe96a24
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/afe96a24
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/afe96a24
Branch: refs/heads/master
Commit: afe96a241d9ac3c11471061da1c4e20dc08f59db
Parents: 4e31510
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Wed Nov 29 20:40:44 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 6 00:34:03 2017 +0800
----------------------------------------------------------------------
.../load/DataLoadProcessorStepOnSpark.scala | 4 +-
.../loading/sort/SortStepRowUtil.java | 91 +++++++++++++-------
.../UnsafeSingleThreadFinalSortFilesMerger.java | 5 +-
3 files changed, 65 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe96a24/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 574fb8a..c28426d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -128,7 +128,7 @@ object DataLoadProcessorStepOnSpark {
val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val sortParameters = SortParameters.createSortParameters(conf)
-
+ val sortStepRowUtil = new SortStepRowUtil(sortParameters)
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
wrapException(e, model)
}
@@ -138,7 +138,7 @@ object DataLoadProcessorStepOnSpark {
override def next(): CarbonRow = {
val row =
- new CarbonRow(SortStepRowUtil.convertRow(rows.next().getData, sortParameters))
+ new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData))
rowCounter.add(1)
row
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe96a24/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
index 9665487..c4e4756 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
@@ -21,53 +21,82 @@ import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
public class SortStepRowUtil {
- public static Object[] convertRow(Object[] data, SortParameters parameters) {
- int measureCount = parameters.getMeasureColCount();
- int dimensionCount = parameters.getDimColCount();
- int complexDimensionCount = parameters.getComplexDimColCount();
- int noDictionaryCount = parameters.getNoDictionaryCount();
- boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+ private int measureCount;
+ private int dimensionCount;
+ private int complexDimensionCount;
+ private int noDictionaryCount;
+ private int[] dictDimIdx;
+ private int[] nonDictIdx;
+ private int[] measureIdx;
- // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+ public SortStepRowUtil(SortParameters parameters) {
+ this.measureCount = parameters.getMeasureColCount();
+ this.dimensionCount = parameters.getDimColCount();
+ this.complexDimensionCount = parameters.getComplexDimColCount();
+ this.noDictionaryCount = parameters.getNoDictionaryCount();
+ boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
- Object[] holder = new Object[3];
int index = 0;
int nonDicIndex = 0;
int allCount = 0;
- int[] dim = new int[dimensionCount];
- byte[][] nonDicArray = new byte[noDictionaryCount + complexDimensionCount][];
- Object[] measures = new Object[measureCount];
- try {
- // read dimension values
- for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
- if (isNoDictionaryDimensionColumn[i]) {
- nonDicArray[nonDicIndex++] = (byte[]) data[i];
- } else {
- dim[index++] = (int) data[allCount];
- }
- allCount++;
- }
- for (int i = 0; i < complexDimensionCount; i++) {
- nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
- allCount++;
+ // be careful that the default value is 0
+ this.dictDimIdx = new int[dimensionCount - noDictionaryCount];
+ this.nonDictIdx = new int[noDictionaryCount + complexDimensionCount];
+ this.measureIdx = new int[measureCount];
+
+ // indices for dict dim columns
+ for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+ if (isNoDictionaryDimensionColumn[i]) {
+ nonDictIdx[nonDicIndex++] = i;
+ } else {
+ dictDimIdx[index++] = allCount;
}
+ allCount++;
+ }
- index = 0;
+ // indices for non dict dim/complex columns
+ for (int i = 0; i < complexDimensionCount; i++) {
+ nonDictIdx[nonDicIndex++] = allCount;
+ allCount++;
+ }
- // read measure values
- for (int i = 0; i < measureCount; i++) {
- measures[index++] = data[allCount];
- allCount++;
+ // indices for measure columns
+ for (int i = 0; i < measureCount; i++) {
+ measureIdx[i] = allCount;
+ allCount++;
+ }
+ }
+
+ public Object[] convertRow(Object[] data) {
+ // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+ Object[] holder = new Object[3];
+ try {
+
+ int[] dictDims = new int[dimensionCount - noDictionaryCount];
+ byte[][] nonDictArray = new byte[noDictionaryCount + complexDimensionCount][];
+ Object[] measures = new Object[measureCount];
+
+ // write dict dim data
+ for (int idx = 0; idx < dictDimIdx.length; idx++) {
+ dictDims[idx] = (int) data[dictDimIdx[idx]];
}
- NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
+ // write non dict dim data
+ for (int idx = 0; idx < nonDictIdx.length; idx++) {
+ nonDictArray[idx] = (byte[]) data[nonDictIdx[idx]];
+ }
+
+ // write measure data
+ for (int idx = 0; idx < measureIdx.length; idx++) {
+ measures[idx] = data[measureIdx[idx]];
+ }
+ NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);
// increment number if record read
} catch (Exception e) {
throw new RuntimeException("Problem while converting row ", e);
}
-
//return out row
return holder;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe96a24/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index eb38efe..ce118d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -55,7 +55,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;
private SortParameters parameters;
-
+ private SortStepRowUtil sortStepRowUtil;
/**
* tempFileLocation
*/
@@ -68,6 +68,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
String[] tempFileLocation) {
this.parameters = parameters;
+ this.sortStepRowUtil = new SortStepRowUtil(parameters);
this.tempFileLocation = tempFileLocation;
this.tableName = parameters.getTableName();
}
@@ -184,7 +185,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
* @return sorted row
*/
public Object[] next() {
- return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters);
+ return sortStepRowUtil.convertRow(getSortedRecordFromFile());
}
/**