You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/03/27 15:11:24 UTC
[carbondata] branch master updated: [CARBONDATA-3333]Fixed No Sort
Store Size issue and Compatibility issue after alter added column done in
1.1 and load in 1.5
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4a7e3bd [CARBONDATA-3333]Fixed No Sort Store Size issue and Compatibility issue after alter added column done in 1.1 and load in 1.5
4a7e3bd is described below
commit 4a7e3bdde32d56288878967e29b8761078cb640a
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Tue Mar 26 22:46:01 2019 +0530
[CARBONDATA-3333]Fixed No Sort Store Size issue and Compatibility issue after alter added column done in 1.1 and load in 1.5
Issue 1: Load is failing in latest version with alter in older version
This is because in table spec was not created based on sort column order and while writing re-arranging the schema the column page is not handled
Issue 2: After PR#3140 store size got increased
Store size got increased after pr#3140 and because of this query performance got degraded, in this pr reverted back the changes done in PR#3140
---
.../carbondata/core/datastore/TableSpec.java | 58 ++++++++++++++------
.../CarbonRowDataWriterProcessorStepImpl.java | 61 ++++++++++++----------
.../carbondata/processing/store/TablePage.java | 49 ++++++++++++++---
3 files changed, 117 insertions(+), 51 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index a26d6ae..002104a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -71,30 +71,52 @@ public class TableSpec {
}
private void addDimensions(List<CarbonDimension> dimensions) {
- int dimIndex = 0;
+ List<DimensionSpec> sortDimSpec = new ArrayList<>();
+ List<DimensionSpec> noSortDimSpec = new ArrayList<>();
+ List<DimensionSpec> noSortNoDictDimSpec = new ArrayList<>();
+ List<DimensionSpec> sortNoDictDimSpec = new ArrayList<>();
+ DimensionSpec spec;
+ short actualPosition = 0;
+ // sort step's output is based on sort column order i.e sort columns data will be present
+ // ahead of non sort columns, so table spec also need to add dimension spec in same manner
for (int i = 0; i < dimensions.size(); i++) {
CarbonDimension dimension = dimensions.get(i);
if (dimension.isComplex()) {
- DimensionSpec spec = new DimensionSpec(ColumnType.COMPLEX, dimension);
- dimensionSpec[dimIndex++] = spec;
- noDictionaryDimensionSpec.add(spec);
+ spec = new DimensionSpec(ColumnType.COMPLEX, dimension, actualPosition++);
} else if (dimension.getDataType() == DataTypes.TIMESTAMP && !dimension
.isDirectDictionaryEncoding()) {
- DimensionSpec spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension);
- dimensionSpec[dimIndex++] = spec;
- noDictionaryDimensionSpec.add(spec);
+ spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension, actualPosition++);
} else if (dimension.isDirectDictionaryEncoding()) {
- DimensionSpec spec = new DimensionSpec(ColumnType.DIRECT_DICTIONARY, dimension);
- dimensionSpec[dimIndex++] = spec;
+ spec = new DimensionSpec(ColumnType.DIRECT_DICTIONARY, dimension, actualPosition++);
} else if (dimension.isGlobalDictionaryEncoding()) {
- DimensionSpec spec = new DimensionSpec(ColumnType.GLOBAL_DICTIONARY, dimension);
- dimensionSpec[dimIndex++] = spec;
+ spec = new DimensionSpec(ColumnType.GLOBAL_DICTIONARY, dimension, actualPosition++);
} else {
- DimensionSpec spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension);
- dimensionSpec[dimIndex++] = spec;
- noDictionaryDimensionSpec.add(spec);
+ spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension, actualPosition++);
+ }
+ if (dimension.isSortColumn()) {
+ sortDimSpec.add(spec);
+ if (!dimension.isDirectDictionaryEncoding() && !dimension.isGlobalDictionaryEncoding()
+ || spec.getColumnType() == ColumnType.COMPLEX) {
+ sortNoDictDimSpec.add(spec);
+ }
+ } else {
+ noSortDimSpec.add(spec);
+ if (!dimension.isDirectDictionaryEncoding() && !dimension.isGlobalDictionaryEncoding()
+ || spec.getColumnType() == ColumnType.COMPLEX) {
+ noSortNoDictDimSpec.add(spec);
+ }
}
}
+ // combine the result
+ final DimensionSpec[] sortDimensionSpecs =
+ sortDimSpec.toArray(new DimensionSpec[sortDimSpec.size()]);
+ final DimensionSpec[] noSortDimensionSpecs =
+ noSortDimSpec.toArray(new DimensionSpec[noSortDimSpec.size()]);
+ System.arraycopy(sortDimensionSpecs, 0, dimensionSpec, 0, sortDimensionSpecs.length);
+ System.arraycopy(noSortDimensionSpecs, 0, dimensionSpec, sortDimensionSpecs.length,
+ noSortDimensionSpecs.length);
+ noDictionaryDimensionSpec.addAll(sortNoDictDimSpec);
+ noDictionaryDimensionSpec.addAll(noSortNoDictDimSpec);
}
private void addMeasures(List<CarbonMeasure> measures) {
@@ -255,10 +277,13 @@ public class TableSpec {
// indicate whether this dimension need to do inverted index
private boolean doInvertedIndex;
- DimensionSpec(ColumnType columnType, CarbonDimension dimension) {
+ // indicate the actual postion in blocklet
+ private short actualPostion;
+ DimensionSpec(ColumnType columnType, CarbonDimension dimension, short actualPostion) {
super(dimension.getColName(), dimension.getDataType(), columnType);
this.inSortColumns = dimension.isSortColumn();
this.doInvertedIndex = dimension.isUseInvertedIndex();
+ this.actualPostion = actualPostion;
}
public boolean isInSortColumns() {
@@ -269,6 +294,9 @@ public class TableSpec {
return doInvertedIndex;
}
+ public short getActualPostion() {
+ return actualPostion;
+ }
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 6345035..25f7cfb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -18,7 +18,9 @@ package org.apache.carbondata.processing.loading.steps;
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -81,17 +83,16 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
- private CarbonFactHandler dataHandler;
+ private List<CarbonFactHandler> carbonFactHandlers;
private ExecutorService executorService = null;
- private static final Object lock = new Object();
-
public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
AbstractDataLoadProcessorStep child) {
super(configuration, child);
this.localDictionaryGeneratorMap =
CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
+ this.carbonFactHandlers = new CopyOnWriteArrayList<>();
}
@Override public void initialize() throws IOException {
@@ -128,31 +129,20 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
System.currentTimeMillis());
- //Creating a Instance of CarbonFacthandler that will be passed to all the threads
- String[] storeLocation = getStoreLocation();
- DataMapWriterListener listener = getDataMapWriterListener(0);
- CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
- .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener);
- model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
- dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
- dataHandler.initialise();
-
if (iterators.length == 1) {
- doExecute(iterators[0], 0, dataHandler);
+ doExecute(iterators[0], 0);
} else {
executorService = Executors.newFixedThreadPool(iterators.length,
new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
.getCarbonTableIdentifier().getTableName(), true));
Future[] futures = new Future[iterators.length];
for (int i = 0; i < iterators.length; i++) {
- futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i, dataHandler));
+ futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i));
}
for (Future future : futures) {
future.get();
}
}
- finish(dataHandler, 0);
- dataHandler = null;
} catch (CarbonDataWriterException e) {
LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e);
throw new CarbonDataLoadingException(
@@ -167,15 +157,31 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
return null;
}
- private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
- CarbonFactHandler dataHandler) throws IOException {
+ private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException {
+ String[] storeLocation = getStoreLocation();
+ DataMapWriterListener listener = getDataMapWriterListener(0);
+ CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
+ configuration, storeLocation, 0, iteratorIndex, listener);
+ model.setColumnLocalDictGenMap(localDictionaryGeneratorMap);
+ CarbonFactHandler dataHandler = null;
boolean rowsNotExist = true;
while (iterator.hasNext()) {
if (rowsNotExist) {
rowsNotExist = false;
+ dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
+ this.carbonFactHandlers.add(dataHandler);
+ dataHandler.initialise();
}
processBatch(iterator.next(), dataHandler, iteratorIndex);
}
+ try {
+ if (!rowsNotExist) {
+ finish(dataHandler, iteratorIndex);
+ }
+ } finally {
+ carbonFactHandlers.remove(dataHandler);
+ }
+
}
@@ -300,9 +306,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
while (batch.hasNext()) {
CarbonRow row = batch.next();
CarbonRow converted = convertRow(row);
- synchronized (lock) {
- dataHandler.addDataToStore(converted);
- }
+ dataHandler.addDataToStore(converted);
readCounter[iteratorIndex]++;
}
writeCounter[iteratorIndex] += batch.getSize();
@@ -316,18 +320,15 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
private Iterator<CarbonRowBatch> iterator;
private int iteratorIndex = 0;
- private CarbonFactHandler dataHandler = null;
- DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex,
- CarbonFactHandler dataHandler) {
+ DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
this.iterator = iterator;
this.iteratorIndex = iteratorIndex;
- this.dataHandler = dataHandler;
}
@Override public void run() {
try {
- doExecute(this.iterator, iteratorIndex, dataHandler);
+ doExecute(this.iterator, iteratorIndex);
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
throw new RuntimeException(e);
@@ -341,9 +342,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
if (null != executorService) {
executorService.shutdownNow();
}
- if (null != dataHandler) {
- dataHandler.finish();
- dataHandler.closeHandler();
+ if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) {
+ for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) {
+ carbonFactHandler.finish();
+ carbonFactHandler.closeHandler();
+ }
}
}
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 7cc8932..5687549 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -22,7 +22,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -393,12 +392,14 @@ public class TablePage {
private EncodedColumnPage[] encodeAndCompressDimensions()
throws KeyGenException, IOException, MemoryException {
List<EncodedColumnPage> encodedDimensions = new ArrayList<>();
- List<EncodedColumnPage> encodedComplexDimensions = new ArrayList<>();
+ EncodedColumnPage[][] complexColumnPages =
+ new EncodedColumnPage[complexDimensionPages.length][];
TableSpec tableSpec = model.getTableSpec();
int dictIndex = 0;
int noDictIndex = 0;
int complexDimIndex = 0;
int numDimensions = tableSpec.getNumDimensions();
+ int totalComplexColumnSize = 0;
for (int i = 0; i < numDimensions; i++) {
ColumnPageEncoder columnPageEncoder;
EncodedColumnPage encodedPage;
@@ -434,17 +435,51 @@ public class TablePage {
break;
case COMPLEX:
EncodedColumnPage[] encodedPages = ColumnPageEncoder.encodeComplexColumn(
- complexDimensionPages[complexDimIndex++]);
- encodedComplexDimensions.addAll(Arrays.asList(encodedPages));
+ complexDimensionPages[complexDimIndex]);
+ complexColumnPages[complexDimIndex] = encodedPages;
+ totalComplexColumnSize += encodedPages.length;
+ complexDimIndex++;
break;
default:
throw new IllegalArgumentException("unsupported dimension type:" + spec
.getColumnType());
}
}
-
- encodedDimensions.addAll(encodedComplexDimensions);
- return encodedDimensions.toArray(new EncodedColumnPage[encodedDimensions.size()]);
+ // below code is to combine the list based on actual order present in carbon table
+ // in case of older version(eg:1.1) alter add column was supported only with sort columns
+ // and sort step will return the data based on sort column order(sort columns first)
+ // so arranging the column pages based on schema is required otherwise query will
+ // either give wrong result(for string columns) or throw exception in case of non string
+ // column as reading is based on schema order
+ int complexEncodedPageIndex = 0;
+ int normalEncodedPageIndex = 0;
+ int currentPosition = 0;
+ EncodedColumnPage[] combinedList =
+ new EncodedColumnPage[encodedDimensions.size() + totalComplexColumnSize];
+ for (int i = 0; i < numDimensions; i++) {
+ TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
+ switch (spec.getColumnType()) {
+ case GLOBAL_DICTIONARY:
+ case DIRECT_DICTIONARY:
+ case PLAIN_VALUE:
+ // add the dimension based on actual postion
+ // current position is considered as complex column will have multiple children
+ combinedList[currentPosition + spec.getActualPostion()] =
+ encodedDimensions.get(normalEncodedPageIndex++);
+ break;
+ case COMPLEX:
+ EncodedColumnPage[] complexColumnPage = complexColumnPages[complexEncodedPageIndex++];
+ for (int j = 0; j < complexColumnPage.length; j++) {
+ combinedList[currentPosition + spec.getActualPostion() + j] = complexColumnPage[j];
+ }
+ // as for complex type 1 position is already considered, so subtract -1
+ currentPosition += complexColumnPage.length - 1;
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported dimension type:" + spec.getColumnType());
+ }
+ }
+ return combinedList;
}
/**