You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/04 12:25:05 UTC
[45/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad]
Support specifying sort column bounds in data loading
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index f605b22..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
-import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
-import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data separately and write to
- * temp files.
- */
-public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(
- UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName());
-
- private SortParameters sortParameters;
-
- private BucketingInfo bucketingInfo;
-
- public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
- BucketingInfo bucketingInfo) {
- this.bucketingInfo = bucketingInfo;
- }
-
- @Override public void initialize(SortParameters sortParameters) {
- this.sortParameters = sortParameters;
- }
-
- @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
- throws CarbonDataLoadingException {
- UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
- UnsafeIntermediateMerger[] intermediateFileMergers =
- new UnsafeIntermediateMerger[sortDataRows.length];
- int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
- inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets();
- if (inMemoryChunkSizeInMB < 5) {
- inMemoryChunkSizeInMB = 5;
- }
- try {
- for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
- SortParameters parameters = sortParameters.getCopy();
- parameters.setPartitionID(i + "");
- setTempLocation(parameters);
- intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
- sortDataRows[i] =
- new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
- sortDataRows[i].initialize();
- }
- } catch (MemoryException e) {
- throw new CarbonDataLoadingException(e);
- }
- ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
- this.threadStatusObserver = new ThreadStatusObserver(executorService);
- final int batchSize = CarbonProperties.getInstance().getBatchSize();
- try {
- for (int i = 0; i < iterators.length; i++) {
- executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, this
- .threadStatusObserver));
- }
- executorService.shutdown();
- executorService.awaitTermination(2, TimeUnit.DAYS);
- processRowToNextStep(sortDataRows, sortParameters);
- } catch (Exception e) {
- checkError();
- throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
- }
- checkError();
- try {
- for (int i = 0; i < intermediateFileMergers.length; i++) {
- intermediateFileMergers[i].finish();
- }
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
- }
-
- Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
- for (int i = 0; i < sortDataRows.length; i++) {
- batchIterator[i] = new MergedDataIterator(batchSize, intermediateFileMergers[i]);
- }
-
- return batchIterator;
- }
-
- private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger() {
- String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
- sortParameters.getDatabaseName(), sortParameters.getTableName(),
- String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
- false, false);
- // Set the data file location
- String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
- File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
- return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
- }
-
- @Override public void close() {
- }
-
- /**
- * Below method will be used to process data to next step
- */
- private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
- throws CarbonDataLoadingException {
- if (null == sortDataRows || sortDataRows.length == 0) {
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- LOGGER.info("Number of Records was Zero");
- String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
- LOGGER.info(logMessage);
- return false;
- }
-
- try {
- for (int i = 0; i < sortDataRows.length; i++) {
- // start sorting
- sortDataRows[i].startSorting();
- }
- // check any more rows are present
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- return false;
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- private void setTempLocation(SortParameters parameters) {
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
- parameters.getTaskNo(), parameters.getSegmentId(),
- false, false);
- String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
- CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
- parameters.setTempFileLocation(tmpLoc);
- }
-
- /**
- * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
- */
- private static class SortIteratorThread implements Runnable {
-
- private Iterator<CarbonRowBatch> iterator;
-
- private UnsafeSortDataRows[] sortDataRows;
-
- private ThreadStatusObserver threadStatusObserver;
-
- public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
- UnsafeSortDataRows[] sortDataRows, ThreadStatusObserver threadStatusObserver) {
- this.iterator = iterator;
- this.sortDataRows = sortDataRows;
- this.threadStatusObserver = threadStatusObserver;
- }
-
- @Override
- public void run() {
- try {
- while (iterator.hasNext()) {
- CarbonRowBatch batch = iterator.next();
- int i = 0;
- while (batch.hasNext()) {
- CarbonRow row = batch.next();
- if (row != null) {
- UnsafeSortDataRows sortDataRow = sortDataRows[row.bucketNumber];
- synchronized (sortDataRow) {
- sortDataRow.addRow(row.getData());
- }
- }
- }
- }
- } catch (Exception e) {
- LOGGER.error(e);
- this.threadStatusObserver.notifyFailed(e);
- }
- }
-
- }
-
- private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
-
-
- private int batchSize;
-
- private boolean firstRow;
-
- private UnsafeIntermediateMerger intermediateMerger;
-
- public MergedDataIterator(int batchSize,
- UnsafeIntermediateMerger intermediateMerger) {
- this.batchSize = batchSize;
- this.intermediateMerger = intermediateMerger;
- this.firstRow = true;
- }
-
- private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
-
- @Override public boolean hasNext() {
- if (firstRow) {
- firstRow = false;
- finalMerger = getFinalMerger();
- List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
- finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
- intermediateMerger.getMergedPages());
- }
- return finalMerger.hasNext();
- }
-
- @Override public CarbonRowBatch next() {
- int counter = 0;
- CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
- while (finalMerger.hasNext() && counter < batchSize) {
- rowBatch.addRow(new CarbonRow(finalMerger.next()));
- counter++;
- }
- return rowBatch;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
new file mode 100644
index 0000000..99d6627
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.impl;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.schema.ColumnRangeInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
+import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for the data loading with specifying column value range, such as
+ * bucketing, sort_column_bounds, it sorts each range of data separately and write to temp files.
+ */
+public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSorter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(
+ UnsafeParallelReadMergeSorterWithColumnRangeImpl.class.getName());
+
+ private SortParameters originSortParameters;
+ private UnsafeIntermediateMerger[] intermediateFileMergers;
+ private int inMemoryChunkSizeInMB;
+ private AtomicLong rowCounter;
+ private ColumnRangeInfo columnRangeInfo;
+ /**
+ * counters to collect information about rows processed by each range
+ */
+ private List<AtomicLong> insideRowCounterList;
+
+ public UnsafeParallelReadMergeSorterWithColumnRangeImpl(AtomicLong rowCounter,
+ ColumnRangeInfo columnRangeInfo) {
+ this.rowCounter = rowCounter;
+ this.columnRangeInfo = columnRangeInfo;
+ }
+
+ @Override public void initialize(SortParameters sortParameters) {
+ this.originSortParameters = sortParameters;
+ int totalInMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ inMemoryChunkSizeInMB = totalInMemoryChunkSizeInMB / columnRangeInfo.getNumOfRanges();
+ if (inMemoryChunkSizeInMB < 5) {
+ inMemoryChunkSizeInMB = 5;
+ }
+ this.insideRowCounterList = new ArrayList<>(columnRangeInfo.getNumOfRanges());
+ for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+ insideRowCounterList.add(new AtomicLong(0));
+ }
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+ throws CarbonDataLoadingException {
+ UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[columnRangeInfo.getNumOfRanges()];
+ intermediateFileMergers = new UnsafeIntermediateMerger[columnRangeInfo.getNumOfRanges()];
+ SortParameters[] sortParameterArray = new SortParameters[columnRangeInfo.getNumOfRanges()];
+ try {
+ for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) {
+ SortParameters parameters = originSortParameters.getCopy();
+ parameters.setPartitionID(i + "");
+ parameters.setRangeId(i);
+ sortParameterArray[i] = parameters;
+ setTempLocation(parameters);
+ intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters);
+ sortDataRows[i] =
+ new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
+ sortDataRows[i].initialize();
+ }
+ } catch (MemoryException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
+ this.threadStatusObserver = new ThreadStatusObserver(executorService);
+ final int batchSize = CarbonProperties.getInstance().getBatchSize();
+ try {
+ for (int i = 0; i < iterators.length; i++) {
+ executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
+ this.insideRowCounterList, this.threadStatusObserver));
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ processRowToNextStep(sortDataRows, originSortParameters);
+ } catch (Exception e) {
+ checkError();
+ throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+ }
+ checkError();
+ try {
+ for (int i = 0; i < intermediateFileMergers.length; i++) {
+ intermediateFileMergers[i].finish();
+ }
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+
+ Iterator<CarbonRowBatch>[] batchIterator = new Iterator[columnRangeInfo.getNumOfRanges()];
+ for (int i = 0; i < sortDataRows.length; i++) {
+ batchIterator[i] =
+ new MergedDataIterator(sortParameterArray[i], batchSize, intermediateFileMergers[i]);
+ }
+
+ return batchIterator;
+ }
+
+ private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) {
+ String[] storeLocation = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+ String.valueOf(sortParameters.getTaskNo()),
+ sortParameters.getSegmentId() + "", false, false);
+ // Set the data file location
+ String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
+ File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation);
+ }
+
+ @Override public void close() {
+ for (int i = 0; i < intermediateFileMergers.length; i++) {
+ intermediateFileMergers[i].close();
+ }
+ }
+
+ /**
+ * Below method will be used to process data to next step
+ */
+ private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters)
+ throws CarbonDataLoadingException {
+ if (null == sortDataRows || sortDataRows.length == 0) {
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ LOGGER.info("Number of Records was Zero");
+ String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+ LOGGER.info(logMessage);
+ return false;
+ }
+
+ try {
+ for (int i = 0; i < sortDataRows.length; i++) {
+ // start sorting
+ sortDataRows[i].startSorting();
+ }
+ // check any more rows are present
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ return false;
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ private void setTempLocation(SortParameters parameters) {
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
+ parameters.getTaskNo(), parameters.getSegmentId(),
+ false, false);
+ String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
+ CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+ LOGGER.error("set temp location: " + StringUtils.join(tmpLoc, ", "));
+ parameters.setTempFileLocation(tmpLoc);
+ }
+
+ /**
+ * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows}
+ */
+ private static class SortIteratorThread implements Runnable {
+
+ private Iterator<CarbonRowBatch> iterator;
+
+ private UnsafeSortDataRows[] sortDataRows;
+ private AtomicLong rowCounter;
+ private List<AtomicLong> insideRowCounterList;
+ private ThreadStatusObserver threadStatusObserver;
+
+ public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+ UnsafeSortDataRows[] sortDataRows, AtomicLong rowCounter,
+ List<AtomicLong> insideRowCounterList,
+ ThreadStatusObserver threadStatusObserver) {
+ this.iterator = iterator;
+ this.sortDataRows = sortDataRows;
+ this.rowCounter = rowCounter;
+ this.insideRowCounterList = insideRowCounterList;
+ this.threadStatusObserver = threadStatusObserver;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (iterator.hasNext()) {
+ CarbonRowBatch batch = iterator.next();
+ int i = 0;
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ if (row != null) {
+ UnsafeSortDataRows sortDataRow = sortDataRows[row.getRangeId()];
+ synchronized (sortDataRow) {
+ rowCounter.getAndIncrement();
+ insideRowCounterList.get(row.getRangeId()).getAndIncrement();
+ sortDataRow.addRow(row.getData());
+ }
+ }
+ }
+ }
+ LOGGER.info("Rows processed by each range: " + insideRowCounterList);
+ } catch (Exception e) {
+ LOGGER.error(e);
+ this.threadStatusObserver.notifyFailed(e);
+ }
+ }
+ }
+
+ private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+ private SortParameters sortParameters;
+
+ private int batchSize;
+
+ private boolean firstRow;
+
+ private UnsafeIntermediateMerger intermediateMerger;
+
+ public MergedDataIterator(SortParameters sortParameters, int batchSize,
+ UnsafeIntermediateMerger intermediateMerger) {
+ this.sortParameters = sortParameters;
+ this.batchSize = batchSize;
+ this.intermediateMerger = intermediateMerger;
+ this.firstRow = true;
+ }
+
+ private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
+
+ @Override public boolean hasNext() {
+ if (firstRow) {
+ firstRow = false;
+ finalMerger = getFinalMerger(sortParameters);
+ List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
+ finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
+ intermediateMerger.getMergedPages());
+ }
+ return finalMerger.hasNext();
+ }
+
+ @Override public CarbonRowBatch next() {
+ int counter = 0;
+ CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
+ while (finalMerger.hasNext() && counter < batchSize) {
+ rowBatch.addRow(new CarbonRow(finalMerger.next()));
+ counter++;
+ }
+ return rowBatch;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 5d038d3..eaa858e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -366,9 +366,9 @@ public class UnsafeSortDataRows {
// create a new file and pick a temp directory randomly every time
String tmpDir = parameters.getTempFileLocation()[
new Random().nextInt(parameters.getTempFileLocation().length)];
- File sortTempFile = new File(
- tmpDir + File.separator + parameters.getTableName()
- + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+ File sortTempFile = new File(tmpDir + File.separator + parameters.getTableName()
+ + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+ + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
writeDataToFile(page, sortTempFile);
LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
+ " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index 0d24e01..104f3f5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -112,9 +112,9 @@ public class UnsafeIntermediateMerger {
String[] tempFileLocations = parameters.getTempFileLocation();
String targetLocation = tempFileLocations[new Random().nextInt(tempFileLocations.length)];
- File file = new File(
- targetLocation + File.separator + parameters.getTableName() + System
- .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
+ File file = new File(targetLocation + File.separator + parameters.getTableName()
+ + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+ + CarbonCommonConstants.MERGERD_EXTENSION);
UnsafeIntermediateFileMerger merger =
new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file);
mergerTask.add(executorService.submit(merger));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 64f3c25..b1dc156 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -147,19 +147,20 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
}
private List<File> getFilesToMergeSort() {
+ // this can be partitionId, bucketId or rangeId, let's call it rangeId
+ final int rangeId = parameters.getRangeId();
+
FileFilter fileFilter = new FileFilter() {
public boolean accept(File pathname) {
- return pathname.getName().startsWith(tableName);
+ return pathname.getName().startsWith(tableName + '_' + rangeId);
}
};
// get all the merged files
List<File> files = new ArrayList<File>(tempFileLocation.length);
- for (String tempLoc : tempFileLocation)
- {
+ for (String tempLoc : tempFileLocation) {
File[] subFiles = new File(tempLoc).listFiles(fileFilter);
- if (null != subFiles && subFiles.length > 0)
- {
+ if (null != subFiles && subFiles.length > 0) {
files.addAll(Arrays.asList(subFiles));
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index 90a340d..72a8c25 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -19,21 +19,33 @@ package org.apache.carbondata.processing.loading.steps;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.BadRecordsLogger;
import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
import org.apache.carbondata.processing.loading.converter.RowConverter;
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.partition.Partitioner;
+import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
+import org.apache.carbondata.processing.loading.partition.impl.RangePartitionerImpl;
+import org.apache.carbondata.processing.loading.partition.impl.RawRowComparator;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
+import org.apache.commons.lang3.StringUtils;
+
/**
* Replace row data fields with dictionary values if column is configured dictionary encoded.
* And nondictionary columns as well as complex columns will be converted to byte[].
@@ -41,7 +53,10 @@ import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
private List<RowConverter> converters;
+ private Partitioner<CarbonRow> partitioner;
private BadRecordsLogger badRecordLogger;
+ private boolean isSortColumnRangeEnabled = false;
+ private boolean isBucketColumnEnabled = false;
public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
AbstractDataLoadProcessorStep child) {
@@ -64,6 +79,81 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
configuration.setCardinalityFinder(converter);
converters.add(converter);
converter.initialize();
+
+ if (null != configuration.getBucketingInfo()) {
+ this.isBucketColumnEnabled = true;
+ initializeBucketColumnPartitioner();
+ } else if (null != configuration.getSortColumnRangeInfo()) {
+ this.isSortColumnRangeEnabled = true;
+ initializeSortColumnRangesPartitioner();
+ }
+ }
+
+ /**
+ * initialize partitioner for bucket column
+ */
+ private void initializeBucketColumnPartitioner() {
+ List<Integer> indexes = new ArrayList<>();
+ List<ColumnSchema> columnSchemas = new ArrayList<>();
+ DataField[] inputDataFields = getOutput();
+ BucketingInfo bucketingInfo = configuration.getBucketingInfo();
+ for (int i = 0; i < inputDataFields.length; i++) {
+ for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) {
+ if (inputDataFields[i].getColumn().getColName()
+ .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) {
+ indexes.add(i);
+ columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema());
+ break;
+ }
+ }
+ }
+
+ // hash partitioner to dispatch rows by bucket column
+ this.partitioner =
+ new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumOfRanges());
+ }
+
+
+ /**
+ * initialize partitioner for sort column ranges
+ */
+ private void initializeSortColumnRangesPartitioner() {
+ // convert user specified sort-column ranges
+ SortColumnRangeInfo sortColumnRangeInfo = configuration.getSortColumnRangeInfo();
+ int rangeValueCnt = sortColumnRangeInfo.getUserSpecifiedRanges().length;
+ CarbonRow[] convertedSortColumnRanges = new CarbonRow[rangeValueCnt];
+ for (int i = 0; i < rangeValueCnt; i++) {
+ Object[] fakeOriginRow = new Object[configuration.getDataFields().length];
+ String[] oneBound = StringUtils.splitPreserveAllTokens(
+ sortColumnRangeInfo.getUserSpecifiedRanges()[i], sortColumnRangeInfo.getSeparator(), -1);
+ // set the corresponding sort column
+ int j = 0;
+ for (int colIdx : sortColumnRangeInfo.getSortColumnIndex()) {
+ fakeOriginRow[colIdx] = oneBound[j++];
+ }
+ CarbonRow fakeCarbonRow = new CarbonRow(fakeOriginRow);
+ convertFakeRow(fakeCarbonRow, sortColumnRangeInfo);
+ convertedSortColumnRanges[i] = fakeCarbonRow;
+ }
+ // sort the range bounds (sort in carbon is a little different from what we think)
+ Arrays.sort(convertedSortColumnRanges,
+ new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
+ sortColumnRangeInfo.getIsSortColumnNoDict()));
+
+ // range partitioner to dispatch rows by sort columns
+ this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges,
+ new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
+ sortColumnRangeInfo.getIsSortColumnNoDict()));
+ }
+
+ // only convert sort column fields
+ private void convertFakeRow(CarbonRow fakeRow, SortColumnRangeInfo sortColumnRangeInfo) {
+ FieldConverter[] fieldConverters = converters.get(0).getFieldConverters();
+ BadRecordLogHolder logHolder = new BadRecordLogHolder();
+ logHolder.setLogged(false);
+ for (int colIdx : sortColumnRangeInfo.getSortColumnIndex()) {
+ fieldConverters[colIdx].convert(fakeRow, logHolder);
+ }
}
/**
@@ -102,6 +192,10 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
while (rowBatch.hasNext()) {
CarbonRow convertRow = localConverter.convert(rowBatch.next());
+ if (isSortColumnRangeEnabled || isBucketColumnEnabled) {
+ short rangeNumber = (short) partitioner.getPartition(convertRow);
+ convertRow.setRangeId(rangeNumber);
+ }
rowBatch.setPreviousRow(convertRow);
}
rowCounter.getAndAdd(rowBatch.getSize());
@@ -134,6 +228,12 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
}
@Override protected String getStepName() {
- return "Data Converter";
+ if (isBucketColumnEnabled) {
+ return "Data Converter with Bucketing";
+ } else if (isSortColumnRangeEnabled) {
+ return "Data Converter with sort column range";
+ } else {
+ return "Data Converter";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
deleted file mode 100644
index a1181c9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.loading.BadRecordsLogger;
-import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
-import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.converter.RowConverter;
-import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
-import org.apache.carbondata.processing.loading.partition.Partitioner;
-import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
-
-/**
- * Replace row data fields with dictionary values if column is configured dictionary encoded.
- * And nondictionary columns as well as complex columns will be converted to byte[].
- */
-public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep {
-
- private List<RowConverter> converters;
-
- private Partitioner<Object[]> partitioner;
-
- private BadRecordsLogger badRecordLogger;
-
- public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration,
- AbstractDataLoadProcessorStep child) {
- super(configuration, child);
- }
-
- @Override
- public DataField[] getOutput() {
- return child.getOutput();
- }
-
- @Override
- public void initialize() throws IOException {
- super.initialize();
- child.initialize();
- converters = new ArrayList<>();
- badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
- RowConverter converter =
- new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
- configuration.setCardinalityFinder(converter);
- converters.add(converter);
- converter.initialize();
- List<Integer> indexes = new ArrayList<>();
- List<ColumnSchema> columnSchemas = new ArrayList<>();
- DataField[] inputDataFields = getOutput();
- BucketingInfo bucketingInfo = configuration.getBucketingInfo();
- for (int i = 0; i < inputDataFields.length; i++) {
- for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) {
- if (inputDataFields[i].getColumn().getColName()
- .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) {
- indexes.add(i);
- columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema());
- break;
- }
- }
- }
- partitioner =
- new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumberOfBuckets());
- }
-
- /**
- * Create the iterator using child iterator.
- *
- * @param childIter
- * @return new iterator with step specific processing.
- */
- @Override
- protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
- return new CarbonIterator<CarbonRowBatch>() {
- RowConverter localConverter;
- private boolean first = true;
- @Override public boolean hasNext() {
- if (first) {
- first = false;
- localConverter = converters.get(0).createCopyForNewThread();
- converters.add(localConverter);
- }
- return childIter.hasNext();
- }
-
- @Override public CarbonRowBatch next() {
- return processRowBatch(childIter.next(), localConverter);
- }
- };
- }
-
- /**
- * Process the batch of rows as per the step logic.
- *
- * @param rowBatch
- * @return processed row.
- */
- protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
- while (rowBatch.hasNext()) {
- CarbonRow row = rowBatch.next();
- short bucketNumber = (short) partitioner.getPartition(row.getData());
- CarbonRow convertRow = localConverter.convert(row);
- convertRow.bucketNumber = bucketNumber;
- rowBatch.setPreviousRow(convertRow);
- }
- rowCounter.getAndAdd(rowBatch.getSize());
- // reuse the origin batch
- rowBatch.rewind();
- return rowBatch;
- }
-
- @Override
- protected CarbonRow processRow(CarbonRow row) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- if (!closed) {
- super.close();
- if (null != badRecordLogger) {
- badRecordLogger.closeStreams();
- CarbonBadRecordUtil.renameBadRecord(configuration);
- }
- if (converters != null) {
- for (RowConverter converter : converters) {
- converter.finish();
- }
- }
- }
- }
- @Override protected String getStepName() {
- return "Data Converter with Bucketing";
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 58009af..0467b11 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -17,7 +17,15 @@
package org.apache.carbondata.processing.loading.steps;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -25,6 +33,7 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
@@ -92,7 +101,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
System.currentTimeMillis());
+ ExecutorService rangeExecutorService = Executors.newFixedThreadPool(iterators.length,
+ new CarbonThreadFactory("WriterForwardPool: " + tableName));
+ List<Future<Void>> rangeExecutorServiceSubmitList = new ArrayList<>(iterators.length);
int i = 0;
+ // do this concurrently
for (Iterator<CarbonRowBatch> iterator : iterators) {
String[] storeLocation = getStoreLocation(tableIdentifier);
@@ -112,9 +125,19 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
if (!rowsNotExist) {
finish(dataHandler);
}
+ rangeExecutorServiceSubmitList.add(
+ rangeExecutorService.submit(new WriterForwarder(iterator, tableIdentifier, i)));
i++;
}
-
+ try {
+ rangeExecutorService.shutdown();
+ rangeExecutorService.awaitTermination(2, TimeUnit.DAYS);
+ for (int j = 0; j < rangeExecutorServiceSubmitList.size(); j++) {
+ rangeExecutorServiceSubmitList.get(j).get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new CarbonDataWriterException(e);
+ }
} catch (CarbonDataWriterException e) {
LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
throw new CarbonDataLoadingException(
@@ -130,6 +153,51 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
return "Data Writer";
}
+ /**
+ * Used to forward rows to different ranges based on range id.
+ */
+ private final class WriterForwarder implements Callable<Void> {
+ private Iterator<CarbonRowBatch> insideRangeIterator;
+ private CarbonTableIdentifier tableIdentifier;
+ private int rangeId;
+
+ public WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator,
+ CarbonTableIdentifier tableIdentifier, int rangeId) {
+ this.insideRangeIterator = insideRangeIterator;
+ this.tableIdentifier = tableIdentifier;
+ this.rangeId = rangeId;
+ }
+
+ @Override public Void call() throws Exception {
+ LOGGER.info("Process writer forward for table " + tableIdentifier.getTableName()
+ + ", range: " + rangeId);
+ processRange(insideRangeIterator, tableIdentifier, rangeId);
+ return null;
+ }
+ }
+
+ private void processRange(Iterator<CarbonRowBatch> insideRangeIterator,
+ CarbonTableIdentifier tableIdentifier, int rangeId) {
+ String[] storeLocation = getStoreLocation(tableIdentifier);
+
+ CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+ .createCarbonFactDataHandlerModel(configuration, storeLocation, rangeId, 0);
+ CarbonFactHandler dataHandler = null;
+ boolean rowsNotExist = true;
+ while (insideRangeIterator.hasNext()) {
+ if (rowsNotExist) {
+ rowsNotExist = false;
+ dataHandler = CarbonFactHandlerFactory
+ .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+ dataHandler.initialise();
+ }
+ processBatch(insideRangeIterator.next(), dataHandler);
+ }
+ if (!rowsNotExist) {
+ finish(dataHandler);
+ }
+ }
+
public void finish(CarbonFactHandler dataHandler) {
CarbonTableIdentifier tableIdentifier =
configuration.getTableIdentifier().getCarbonTableIdentifier();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index a4ac0ea..1a839a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -119,9 +119,10 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
}
private List<File> getFilesToMergeSort() {
+ final int rangeId = sortParameters.getRangeId();
FileFilter fileFilter = new FileFilter() {
public boolean accept(File pathname) {
- return pathname.getName().startsWith(tableName);
+ return pathname.getName().startsWith(tableName + '_' + rangeId);
}
};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index c7efbd9..a5caf7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -212,9 +212,9 @@ public class SortDataRows {
// create new file and choose folder randomly
String[] tmpLocation = parameters.getTempFileLocation();
String locationChosen = tmpLocation[new Random().nextInt(tmpLocation.length)];
- File file = new File(
- locationChosen + File.separator + parameters.getTableName() +
- System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+ File file = new File(locationChosen + File.separator + parameters.getTableName()
+ + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+ + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
writeDataToFile(recordHolderList, this.entryCount, file);
}
@@ -325,8 +325,9 @@ public class SortDataRows {
String[] tmpFileLocation = parameters.getTempFileLocation();
String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)];
File sortTempFile = new File(
- locationChosen + File.separator + parameters.getTableName() + System
- .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+ locationChosen + File.separator + parameters.getTableName()
+ + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+ + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile);
// add sort temp filename to and arrayList. When the list size reaches 20 then
// intermediate merging of sort temp files will be triggered
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
index 9c995a5..0e3f6bd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java
@@ -94,9 +94,9 @@ public class SortIntermediateFileMerger {
private void startIntermediateMerging(File[] intermediateFiles) {
int index = new Random().nextInt(parameters.getTempFileLocation().length);
String chosenTempDir = parameters.getTempFileLocation()[index];
- File file = new File(
- chosenTempDir + File.separator + parameters.getTableName() + System
- .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION);
+ File file = new File(chosenTempDir + File.separator + parameters.getTableName()
+ + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+ + CarbonCommonConstants.MERGERD_EXTENSION);
IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file);
mergerTask.add(executorService.submit(merger));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 4d31f87..4d333ed 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -119,6 +119,7 @@ public class SortParameters implements Serializable {
private int numberOfCores;
private int batchSortSizeinMb;
+ private int rangeId = 0;
public SortParameters getCopy() {
SortParameters parameters = new SortParameters();
@@ -147,6 +148,7 @@ public class SortParameters implements Serializable {
parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
parameters.numberOfCores = numberOfCores;
parameters.batchSortSizeinMb = batchSortSizeinMb;
+ parameters.rangeId = rangeId;
return parameters;
}
@@ -429,6 +431,14 @@ public class SortParameters implements Serializable {
return parameters;
}
+ public int getRangeId() {
+ return rangeId;
+ }
+
+ public void setRangeId(int rangeId) {
+ this.rangeId = rangeId;
+ }
+
/**
* this method will set the boolean mapping for no dictionary sort columns
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index b795696..c0acadd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -167,12 +167,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
blockletProcessingCount = new AtomicInteger(0);
producerExecutorService = Executors.newFixedThreadPool(numberOfCores,
- new CarbonThreadFactory("ProducerPool:" + model.getTableName()));
+ new CarbonThreadFactory("ProducerPool:" + model.getTableName()
+ + ", range: " + model.getBucketId()));
producerExecutorServiceTaskList =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
LOGGER.info("Initializing writer executors");
consumerExecutorService = Executors
- .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName()));
+ .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName()
+ + ", range: " + model.getBucketId()));
consumerExecutorServiceTaskList = new ArrayList<>(1);
semaphore = new Semaphore(numberOfCores);
tablePageList = new TablePageList();