You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/04 13:21:10 UTC
[1/2] incubator-carbondata git commit: Added unsafe sort for
bucketing feature
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 061c3c2e9 -> 65367ef63
Added unsafe sort for bucketing feature
Rebased
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f82b10b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f82b10b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f82b10b4
Branch: refs/heads/master
Commit: f82b10b4189a82922b02d784effa0099681badf7
Parents: 061c3c2
Author: ravipesala <ra...@gmail.com>
Authored: Sun Mar 26 19:12:24 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Thu May 4 21:20:36 2017 +0800
----------------------------------------------------------------------
.../bucketing/TableBucketingTestCase.scala | 24 ++
.../processing/newflow/sort/SorterFactory.java | 73 +++++
...arallelReadMergeSorterWithBucketingImpl.java | 18 +-
.../UnsafeBatchParallelReadMergeSorterImpl.java | 7 +-
.../impl/UnsafeParallelReadMergeSorterImpl.java | 6 +-
...arallelReadMergeSorterWithBucketingImpl.java | 264 +++++++++++++++++++
.../newflow/sort/unsafe/UnsafeSortDataRows.java | 36 ++-
.../UnsafeSingleThreadFinalSortFilesMerger.java | 5 +-
...ConverterProcessorWithBucketingStepImpl.java | 3 +-
.../newflow/steps/SortProcessorStepImpl.java | 25 +-
10 files changed, 423 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 8d3eed7..2731812 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -45,6 +45,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS t7")
sql("DROP TABLE IF EXISTS t8")
sql("DROP TABLE IF EXISTS t9")
+ sql("DROP TABLE IF EXISTS t10")
}
test("test create table with buckets") {
@@ -66,6 +67,27 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
}
}
+ test("test create table with buckets unsafe") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+ sql(
+ """
+ CREATE TABLE t10
+ (ID Int, date Timestamp, country String,
+ name String, phonetype String, serialname String, salary Int)
+ USING org.apache.spark.sql.CarbonSource
+ OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t10")
+ """)
+ LoadTable(Some("default"), "t10", s"$resourcesPath/source.csv", Nil,
+ Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+ val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t10")
+ if (table != null && table.getBucketingInfo("t10") != null) {
+ assert(true)
+ } else {
+ assert(false, "Bucketing info does not exist")
+ }
+ }
+
test("must be unable to create if number of buckets is in negative number") {
try {
sql(
@@ -215,6 +237,8 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS t6")
sql("DROP TABLE IF EXISTS t7")
sql("DROP TABLE IF EXISTS t8")
+ sql("DROP TABLE IF EXISTS t9")
+ sql("DROP TABLE IF EXISTS t10")
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", threshold.toString)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
new file mode 100644
index 0000000..60cca69
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
+
+public class SorterFactory {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(SorterFactory.class.getName());
+
+ public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) {
+ boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
+ boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
+ CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
+ Sorter sorter;
+ if (offheapsort) {
+ if (configuration.getBucketingInfo() != null) {
+ sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),
+ configuration.getBucketingInfo());
+ } else {
+ sorter = new UnsafeParallelReadMergeSorterImpl(counter);
+ }
+ } else {
+ if (configuration.getBucketingInfo() != null) {
+ sorter =
+ new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo());
+ } else {
+ sorter = new ParallelReadMergeSorterImpl(counter);
+ }
+ }
+ if (batchSort) {
+ if (configuration.getBucketingInfo() == null) {
+ sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
+ } else {
+ LOGGER.warn(
+ "Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass()
+ .getName());
+ }
+ }
+ return sorter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index e5af1c6..cb1b8fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -57,10 +57,10 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
private SortParameters sortParameters;
- private SortIntermediateFileMerger intermediateFileMerger;
-
private ExecutorService executorService;
+ private SortIntermediateFileMerger[] intermediateFileMergers;
+
private BucketingInfo bucketingInfo;
private int sortBufferSize;
@@ -75,7 +75,6 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
@Override public void initialize(SortParameters sortParameters) {
this.sortParameters = sortParameters;
- intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
int buffer = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
sortBufferSize = buffer / bucketingInfo.getNumberOfBuckets();
@@ -87,13 +86,16 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
@Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
throws CarbonDataLoadingException {
SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()];
+ intermediateFileMergers =
+ new SortIntermediateFileMerger[sortDataRows.length];
try {
for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
SortParameters parameters = sortParameters.getCopy();
parameters.setPartitionID(i + "");
setTempLocation(parameters);
parameters.setBufferSize(sortBufferSize);
- sortDataRows[i] = new SortDataRows(parameters, intermediateFileMerger);
+ intermediateFileMergers[i] = new SortIntermediateFileMerger(parameters);
+ sortDataRows[i] = new SortDataRows(parameters, intermediateFileMergers[i]);
sortDataRows[i].initialize();
}
} catch (CarbonSortKeyAndGroupByException e) {
@@ -116,7 +118,9 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
}
checkError();
try {
- intermediateFileMerger.finish();
+ for (int i = 0; i < intermediateFileMergers.length; i++) {
+ intermediateFileMergers[i].finish();
+ }
} catch (CarbonDataWriterException e) {
throw new CarbonDataLoadingException(e);
} catch (CarbonSortKeyAndGroupByException e) {
@@ -148,7 +152,9 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
}
@Override public void close() {
- intermediateFileMerger.close();
+ for (int i = 0; i < intermediateFileMergers.length; i++) {
+ intermediateFileMergers[i].close();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index a54410c..0c6fa27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -196,9 +196,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
}
private void createSortDataRows() {
- this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters);
+ int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ this.finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
+ sortParameters.getTempFileLocation());
unsafeIntermediateFileMerger = new UnsafeIntermediateMerger(sortParameters);
- sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger);
+ sortDataRow = new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger,
+ inMemoryChunkSizeInMB);
try {
sortDataRow.initialize();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 0caafec..503f92a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -78,13 +78,15 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
// Set the data file location
String dataFolderLocation =
storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
- finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters);
+ finalMerger = new UnsafeSingleThreadFinalSortFilesMerger(sortParameters,
+ sortParameters.getTempFileLocation());
}
@Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
throws CarbonDataLoadingException {
+ int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
UnsafeSortDataRows sortDataRow =
- new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger);
+ new UnsafeSortDataRows(sortParameters, unsafeIntermediateFileMerger, inMemoryChunkSizeInMB);
final int batchSize = CarbonProperties.getInstance().getBatchSize();
try {
sortDataRow.initialize();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
new file mode 100644
index 0000000..43abf66
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for bucketing, it sorts each bucket data separately and write to
+ * temp files.
+ */
+public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
+
+ private SortParameters sortParameters;
+
+ private ExecutorService executorService;
+
+ private BucketingInfo bucketingInfo;
+
+ private DataField[] inputDataFields;
+
+ public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
+ BucketingInfo bucketingInfo) {
+ this.inputDataFields = inputDataFields;
+ this.bucketingInfo = bucketingInfo;
+ }
+
+ @Override public void initialize(SortParameters sortParameters) {
+ this.sortParameters = sortParameters;
+ int buffer = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+ throws CarbonDataLoadingException {
+ UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
+ UnsafeIntermediateMerger[] intermediateFileMergers =
+ new UnsafeIntermediateMerger[sortDataRows.length];
+ int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets();
+ if (inMemoryChunkSizeInMB < 5) {
+ inMemoryChunkSizeInMB = 5;
+ }
+ try {
+ for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+ SortParameters parameters = sortParameters.getCopy();
+ parameters.setPartitionID(i + "");
+ setTempLocation(parameters);
+ intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
+ sortDataRows[i] =
+ new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
+ sortDataRows[i].initialize();
+ }
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ this.executorService = Executors.newFixedThreadPool(iterators.length);
+ final int batchSize = CarbonProperties.getInstance().getBatchSize();
+ try {
+ for (int i = 0; i < iterators.length; i++) {
+ executorService.submit(new SortIteratorThread(iterators[i], sortDataRows));
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ processRowToNextStep(sortDataRows, sortParameters);
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+ }
+ try {
+ for (int i = 0; i < intermediateFileMergers.length; i++) {
+ intermediateFileMergers[i].finish();
+ }
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+
+ Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
+ for (int i = 0; i < sortDataRows.length; i++) {
+ batchIterator[i] =
+ new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]);
+ }
+
+ return batchIterator;
+ }
+
+ private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
+ String storeLocation = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+ String.valueOf(sortParameters.getTaskNo()), bucketId,
+ sortParameters.getSegmentId() + "", false);
+ // Set the data file location
+ String dataFolderLocation =
+ storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
+ return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
+ }
+
+ @Override public void close() {
+ }
+
+ /**
+ * Below method will be used to process data to next step
+ */
+ private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
+ throws CarbonDataLoadingException {
+ if (null == sortDataRows || sortDataRows.length == 0) {
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ LOGGER.info("Number of Records was Zero");
+ String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+ LOGGER.info(logMessage);
+ return false;
+ }
+
+ try {
+ for (int i = 0; i < sortDataRows.length; i++) {
+ // start sorting
+ sortDataRows[i].startSorting();
+ }
+ // check any more rows are present
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ return false;
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ private void setTempLocation(SortParameters parameters) {
+ String carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
+ parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(), false);
+ parameters.setTempFileLocation(
+ carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ }
+
+ /**
+ * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
+ */
+ private static class SortIteratorThread implements Callable<Void> {
+
+ private Iterator<CarbonRowBatch> iterator;
+
+ private UnsafeSortDataRows[] sortDataRows;
+
+ public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+ UnsafeSortDataRows[] sortDataRows) {
+ this.iterator = iterator;
+ this.sortDataRows = sortDataRows;
+ }
+
+ @Override public Void call() throws CarbonDataLoadingException {
+ try {
+ while (iterator.hasNext()) {
+ CarbonRowBatch batch = iterator.next();
+ int i = 0;
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ if (row != null) {
+ UnsafeSortDataRows sortDataRow = sortDataRows[row.bucketNumber];
+ synchronized (sortDataRow) {
+ sortDataRow.addRow(row.getData());
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new CarbonDataLoadingException(e);
+ }
+ return null;
+ }
+
+ }
+
+ private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+ private String partitionId;
+
+ private int batchSize;
+
+ private boolean firstRow;
+
+ private UnsafeIntermediateMerger intermediateMerger;
+
+ public MergedDataIterator(String partitionId, int batchSize,
+ UnsafeIntermediateMerger intermediateMerger) {
+ this.partitionId = partitionId;
+ this.batchSize = batchSize;
+ this.intermediateMerger = intermediateMerger;
+ this.firstRow = true;
+ }
+
+ private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+ @Override public boolean hasNext() {
+ if (firstRow) {
+ firstRow = false;
+ finalMerger = getFinalMerger(partitionId);
+ List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
+ finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+ intermediateMerger.getMergedPages());
+ }
+ return finalMerger.hasNext();
+ }
+
+ @Override public CarbonRowBatch next() {
+ int counter = 0;
+ CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+ while (finalMerger.hasNext() && counter < batchSize) {
+ rowBatch.addRow(new CarbonRow(finalMerger.next()));
+ counter++;
+ }
+ return rowBatch;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 3afd3b0..df3825a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -84,7 +84,7 @@ public class UnsafeSortDataRows {
private Semaphore semaphore;
public UnsafeSortDataRows(SortParameters parameters,
- UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger) {
+ UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
this.parameters = parameters;
this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
@@ -92,7 +92,7 @@ public class UnsafeSortDataRows {
// observer of writing file in thread
this.threadStatusObserver = new ThreadStatusObserver();
- this.inMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ this.inMemoryChunkSize = inMemoryChunkSize;
this.inMemoryChunkSize = this.inMemoryChunkSize * 1024 * 1024;
enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
@@ -193,6 +193,38 @@ public class UnsafeSortDataRows {
}
/**
+ * This method will be used to add new row
+ */
+ public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException {
+ // if record holder list size is equal to sort buffer size then it will
+ // sort the list and then write current list data to file
+ if (rowPage.canAdd()) {
+ rowPage.addRow(row);
+ } else {
+ try {
+ if (enableInMemoryIntermediateMerge) {
+ unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible();
+ }
+ unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
+ semaphore.acquire();
+ dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
+ MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
+ boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
+ rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getDimColCount(), parameters.getMeasureColCount(),
+ parameters.getAggType(), memoryBlock,
+ saveToDisk);
+ rowPage.addRow(row);
+ } catch (Exception e) {
+ LOGGER.error(
+ "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+
+ }
+ }
+
+ /**
* Below method will be used to start storing process This method will get
* all the temp files present in sort temp folder then it will create the
* record holder heap and then it will read first record from each file and
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 95a337a..cd6b321 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -82,7 +82,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
private boolean isStopProcess;
- public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters) {
+ public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
+ String tempFileLocation) {
this.parameters = parameters;
// set measure and dimension count
this.measureCount = parameters.getMeasureColCount();
@@ -91,7 +92,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
this.noDictionaryCount = parameters.getNoDictionaryCount();
this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
- this.tempFileLocation = parameters.getTempFileLocation();
+ this.tempFileLocation = tempFileLocation;
this.tableName = parameters.getTableName();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index 86971c3..78df028 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -128,8 +128,9 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
while (rowBatch.hasNext()) {
CarbonRow next = rowBatch.next();
+ short bucketNumber = (short) partitioner.getPartition(next.getData());
CarbonRow convertRow = localConverter.convert(next);
- convertRow.bucketNumber = (short) partitioner.getPartition(next.getData());
+ convertRow.bucketNumber = bucketNumber;
newBatch.addRow(convertRow);
}
rowCounter.getAndAdd(newBatch.getSize());
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f82b10b4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
index 17cc01e..698459c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.processing.newflow.steps;
import java.io.IOException;
import java.util.Iterator;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.DataField;
@@ -28,10 +26,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep
import org.apache.carbondata.processing.newflow.row.CarbonRow;
import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
import org.apache.carbondata.processing.newflow.sort.Sorter;
-import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.SorterFactory;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
/**
@@ -56,23 +51,7 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
public void initialize() throws IOException {
child.initialize();
SortParameters sortParameters = SortParameters.createSortParameters(configuration);
- boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
- boolean batchSort = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT,
- CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
- if (batchSort) {
- sorter = new UnsafeBatchParallelReadMergeSorterImpl(rowCounter);
- } else if (offheapsort) {
- sorter = new UnsafeParallelReadMergeSorterImpl(rowCounter);
- } else {
- sorter = new ParallelReadMergeSorterImpl(rowCounter);
- }
- if (configuration.getBucketingInfo() != null) {
- sorter = new ParallelReadMergeSorterWithBucketingImpl(rowCounter,
- configuration.getBucketingInfo());
- }
+ sorter = SorterFactory.createSorter(configuration, rowCounter);
sorter.initialize(sortParameters);
}
[2/2] incubator-carbondata git commit: [CARBONDATA-822] Added unsafe
sort for bucketing feature This closes #701
Posted by ja...@apache.org.
[CARBONDATA-822] Added unsafe sort for bucketing feature This closes #701
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/65367ef6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/65367ef6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/65367ef6
Branch: refs/heads/master
Commit: 65367ef63268bf7bcfd86a740f688b03bd67b484
Parents: 061c3c2 f82b10b
Author: jackylk <ja...@huawei.com>
Authored: Thu May 4 21:20:58 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Thu May 4 21:20:58 2017 +0800
----------------------------------------------------------------------
.../bucketing/TableBucketingTestCase.scala | 24 ++
.../processing/newflow/sort/SorterFactory.java | 73 +++++
...arallelReadMergeSorterWithBucketingImpl.java | 18 +-
.../UnsafeBatchParallelReadMergeSorterImpl.java | 7 +-
.../impl/UnsafeParallelReadMergeSorterImpl.java | 6 +-
...arallelReadMergeSorterWithBucketingImpl.java | 264 +++++++++++++++++++
.../newflow/sort/unsafe/UnsafeSortDataRows.java | 36 ++-
.../UnsafeSingleThreadFinalSortFilesMerger.java | 5 +-
...ConverterProcessorWithBucketingStepImpl.java | 3 +-
.../newflow/steps/SortProcessorStepImpl.java | 25 +-
10 files changed, 423 insertions(+), 38 deletions(-)
----------------------------------------------------------------------