You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/02 08:08:41 UTC
[1/2] incubator-carbondata git commit: Optimize data loading
Repository: incubator-carbondata
Updated Branches:
refs/heads/master f47bbc2c2 -> e7e370cac
Optimize data loading
Handled broadcast fails.
Updated as per comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/63434fac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/63434fac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/63434fac
Branch: refs/heads/master
Commit: 63434fac5f4dc2d7eb9d03819401e243744a5f48
Parents: f47bbc2
Author: ravipesala <ra...@gmail.com>
Authored: Sun Nov 27 17:39:36 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Fri Dec 2 15:52:02 2016 +0800
----------------------------------------------------------------------
.../carbondata/common/CarbonIterator.java | 14 +++
.../AbstractDetailQueryResultIterator.java | 2 +-
.../carbondata/hadoop/csv/CSVInputFormat.java | 42 +++++---
.../recorditerator/RecordReaderIterator.java | 31 +++++-
.../spark/rdd/NewCarbonDataLoadRDD.scala | 40 ++++---
.../processing/iterator/CarbonIterator.java | 38 -------
.../processing/newflow/DataLoadExecutor.java | 5 +-
.../newflow/DataLoadProcessBuilder.java | 8 +-
.../sort/impl/ParallelReadMergeSorterImpl.java | 90 ++++++++--------
.../newflow/steps/InputProcessorStepImpl.java | 105 ++++++++++++++-----
.../sortandgroupby/sortdata/SortDataRows.java | 40 +++++++
11 files changed, 268 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java b/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java
index 9141bcd..b1a5b5a 100644
--- a/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java
+++ b/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java
@@ -35,4 +35,18 @@ public abstract class CarbonIterator<E> implements Iterator<E> {
throw new UnsupportedOperationException("remove");
}
+ /**
+ * Initialize the iterator
+ */
+ public void initialize() {
+ // sub classes can overwrite to provide initialize logic to this method
+ }
+
+ /**
+ * Close the resources
+ */
+ public void close() {
+ // sub classes can overwrite to provide close logic to this method
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
index c8c61b0..07ccab4 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -114,7 +114,7 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize());
DataRefNode startDataBlock = finder
.findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
- while (startDataBlock.nodeNumber() != blockInfo.getStartBlockletIndex()) {
+ while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
startDataBlock = startDataBlock.getNextDataRefNode();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
index 3ea96ac..ca27673 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
@@ -66,6 +66,8 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
public static final String ESCAPE_DEFAULT = "\\";
public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
public static final boolean HEADER_PRESENT_DEFAULT = false;
+ public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
+ public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
@Override
public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit,
@@ -85,10 +87,10 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
/**
* Sets the comment char to configuration. Default it is #.
- * @param commentChar
* @param configuration
+ * @param commentChar
*/
- public static void setCommentCharacter(String commentChar, Configuration configuration) {
+ public static void setCommentCharacter(Configuration configuration, String commentChar) {
if (commentChar != null && !commentChar.isEmpty()) {
configuration.set(COMMENT, commentChar);
}
@@ -96,10 +98,10 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
/**
* Sets the delimiter to configuration. Default it is ','
- * @param delimiter
* @param configuration
+ * @param delimiter
*/
- public static void setCSVDelimiter(String delimiter, Configuration configuration) {
+ public static void setCSVDelimiter(Configuration configuration, String delimiter) {
if (delimiter != null && !delimiter.isEmpty()) {
configuration.set(DELIMITER, delimiter);
}
@@ -107,10 +109,10 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
/**
* Sets the escape character to configuration. Default it is \
- * @param escapeCharacter
* @param configuration
+ * @param escapeCharacter
*/
- public static void setEscapeCharacter(String escapeCharacter, Configuration configuration) {
+ public static void setEscapeCharacter(Configuration configuration, String escapeCharacter) {
if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
configuration.set(ESCAPE, escapeCharacter);
}
@@ -118,26 +120,37 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
/**
* Whether header needs to read from csv or not. By default it is false.
- * @param headerExtractEnable
* @param configuration
+ * @param headerExtractEnable
*/
- public static void setHeaderExtractionEnabled(boolean headerExtractEnable,
- Configuration configuration) {
+ public static void setHeaderExtractionEnabled(Configuration configuration,
+ boolean headerExtractEnable) {
configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
}
/**
* Sets the quote character to configuration. Default it is "
- * @param quoteCharacter
* @param configuration
+ * @param quoteCharacter
*/
- public static void setQuoteCharacter(String quoteCharacter, Configuration configuration) {
+ public static void setQuoteCharacter(Configuration configuration, String quoteCharacter) {
if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
configuration.set(QUOTE, quoteCharacter);
}
}
/**
+ * Sets the read buffer size to configuration.
+ * @param configuration
+ * @param bufferSize
+ */
+ public static void setReadBufferSize(Configuration configuration, String bufferSize) {
+ if (bufferSize != null && !bufferSize.isEmpty()) {
+ configuration.set(READ_BUFFER_SIZE, bufferSize);
+ }
+ }
+
+ /**
* Treats value as line in file. Key is null.
*/
public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {
@@ -163,8 +176,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
Configuration job = context.getConfiguration();
CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(file);
- InputStream inputStream = null;
+ int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT));
+ FSDataInputStream fileIn = fs.open(file, bufferSize);
+ InputStream inputStream;
if (codec != null) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
@@ -209,6 +223,8 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
parserSettings.setIgnoreLeadingWhitespaces(false);
parserSettings.setIgnoreTrailingWhitespaces(false);
parserSettings.setSkipEmptyLines(false);
+ // TODO get from csv file.
+ parserSettings.setMaxColumns(1000);
parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
if (start == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
index 478af0a..a1bc1e9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
@@ -16,12 +16,16 @@
*/
package org.apache.carbondata.hadoop.csv.recorditerator;
+import java.io.IOException;
+
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.hadoop.io.StringArrayWritable;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* It is wrapper iterator around @{@link RecordReader}.
@@ -38,8 +42,15 @@ public class RecordReaderIterator extends CarbonIterator<Object []> {
*/
private boolean isConsumed;
- public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader) {
+ private InputSplit split;
+
+ private TaskAttemptContext context;
+
+ public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader,
+ InputSplit split, TaskAttemptContext context) {
this.recordReader = recordReader;
+ this.split = split;
+ this.context = context;
}
@Override
@@ -65,4 +76,22 @@ public class RecordReaderIterator extends CarbonIterator<Object []> {
throw new CarbonDataLoadingException(e);
}
}
+
+ @Override
+ public void initialize() {
+ try {
+ recordReader.initialize(split, context);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ recordReader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 67d1ce0..44a2416 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -29,18 +29,18 @@ import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableCon
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.hadoop.csv.CSVInputFormat
import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.newflow.DataLoadExecutor
import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
import org.apache.carbondata.spark.DataLoadResult
-import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.splits.TableSplit
import org.apache.carbondata.spark.util.CarbonQueryUtil
@@ -133,9 +133,13 @@ class NewCarbonDataLoadRDD[K, V](
throw e
}
- def getInputIterators: Array[util.Iterator[Array[AnyRef]]] = {
+ def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, theSplit.index, 0)
- val configuration: Configuration = confBroadcast.value.value
+ var configuration: Configuration = confBroadcast.value.value
+ // Broadcast fails in some cases WTF??
+ if (configuration == null) {
+ configuration = new Configuration()
+ }
configureCSVInputFormat(configuration)
val hadoopAttemptContext = newTaskAttemptContext(configuration, attemptId)
val format = new CSVInputFormat
@@ -160,10 +164,11 @@ class NewCarbonDataLoadRDD[K, V](
partitionID, split.partitionBlocksDetail.length)
val readers =
split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
- readers.zipWithIndex.foreach { case (reader, index) =>
- reader.initialize(split.partitionBlocksDetail(index), hadoopAttemptContext)
+ readers.zipWithIndex.map { case (reader, index) =>
+ new RecordReaderIterator(reader,
+ split.partitionBlocksDetail(index),
+ hadoopAttemptContext)
}
- readers.map(new RecordReaderIterator(_))
} else {
// for node partition
val split = theSplit.asInstanceOf[CarbonNodePartition]
@@ -185,21 +190,22 @@ class NewCarbonDataLoadRDD[K, V](
StandardLogService.setThreadName(blocksID, null)
val readers =
split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
- readers.zipWithIndex.foreach { case (reader, index) =>
- reader.initialize(split.nodeBlocksDetail(index), hadoopAttemptContext)
+ readers.zipWithIndex.map { case (reader, index) =>
+ new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
}
- readers.map(new RecordReaderIterator(_))
}
}
def configureCSVInputFormat(configuration: Configuration): Unit = {
- CSVInputFormat.setCommentCharacter(carbonLoadModel.getCommentChar, configuration)
- CSVInputFormat.setCSVDelimiter(carbonLoadModel.getCsvDelimiter, configuration)
- CSVInputFormat.setEscapeCharacter(carbonLoadModel.getEscapeChar, configuration)
- CSVInputFormat.setHeaderExtractionEnabled(
- carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty,
- configuration)
- CSVInputFormat.setQuoteCharacter(carbonLoadModel.getQuoteChar, configuration)
+ CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
+ CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter)
+ CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar)
+ CSVInputFormat.setHeaderExtractionEnabled(configuration,
+ carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty)
+ CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar)
+ CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java b/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java
deleted file mode 100644
index 35bba6f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.processing.iterator;
-
-public interface CarbonIterator<E> {
- /**
- * Returns <tt>true</tt> if the iteration has more elements. (In other
- * words, returns <tt>true</tt> if <tt>next</tt> would return an element
- * rather than throwing an exception.)
- *
- * @return <tt>true</tt> if the iterator has more elements.
- */
- boolean hasNext();
-
- /**
- * Returns the next element in the iteration.
- *
- * @return the next element in the iteration.
- */
- E next();
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
index 746e0f2..fc24aa8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
@@ -16,8 +16,7 @@
*/
package org.apache.carbondata.processing.newflow;
-import java.util.Iterator;
-
+import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
@@ -35,7 +34,7 @@ public class DataLoadExecutor {
LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
public void execute(CarbonLoadModel loadModel, String storeLocation,
- Iterator<Object[]>[] inputIterators) throws Exception {
+ CarbonIterator<Object[]>[] inputIterators) throws Exception {
AbstractDataLoadProcessorStep loadProcessorStep = null;
try {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 92c677c..a5388d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -18,13 +18,14 @@ package org.apache.carbondata.processing.newflow;
import java.io.File;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
@@ -50,7 +51,7 @@ public final class DataLoadProcessBuilder {
LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName());
public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
- Iterator[] inputIterators) throws Exception {
+ CarbonIterator[] inputIterators) throws Exception {
CarbonDataLoadConfiguration configuration =
createConfiguration(loadModel, storeLocation);
// 1. Reads the data input iterators and parses the data.
@@ -133,6 +134,9 @@ public final class DataLoadProcessBuilder {
loadModel.getBadRecordsAction().split(",")[1]);
configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
loadModel.getFactFilePath());
+ if(CarbonMetadata.getInstance().getCarbonTable(carbonTable.getTableUniqueName()) == null) {
+ CarbonMetadata.getInstance().addCarbonTable(carbonTable);
+ }
List<CarbonDimension> dimensions =
carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
List<CarbonMeasure> measures =
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index cd487ec..e2e995c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.newflow.DataField;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
@@ -90,25 +91,23 @@ public class ParallelReadMergeSorterImpl implements Sorter {
@Override
public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
throws CarbonDataLoadingException {
- SortDataRows[] sortDataRows = new SortDataRows[iterators.length];
+ SortDataRows sortDataRow = new SortDataRows(sortParameters, intermediateFileMerger);
+ final int batchSize = CarbonProperties.getInstance().getBatchSize();
try {
- for (int i = 0; i < iterators.length; i++) {
- sortDataRows[i] = new SortDataRows(sortParameters, intermediateFileMerger);
- // initialize sort
- sortDataRows[i].initialize();
- }
+ sortDataRow.initialize();
} catch (CarbonSortKeyAndGroupByException e) {
throw new CarbonDataLoadingException(e);
}
this.executorService = Executors.newFixedThreadPool(iterators.length);
try {
- for (int i = 0; i < sortDataRows.length; i++) {
+ for (int i = 0; i < iterators.length; i++) {
executorService.submit(
- new SortIteratorThread(iterators[i], sortDataRows[i], sortParameters));
+ new SortIteratorThread(iterators[i], sortDataRow, sortParameters, batchSize));
}
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.DAYS);
+ processRowToNextStep(sortDataRow, sortParameters);
} catch (Exception e) {
throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
}
@@ -121,9 +120,6 @@ public class ParallelReadMergeSorterImpl implements Sorter {
throw new CarbonDataLoadingException(e);
}
- //TODO get the batch size from CarbonProperties
- final int batchSize = 1000;
-
// Creates the iterator to read from merge sorter.
Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
@@ -151,6 +147,36 @@ public class ParallelReadMergeSorterImpl implements Sorter {
}
/**
+ * Below method will be used to process data to next step
+ */
+ private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters)
+ throws CarbonDataLoadingException {
+ if (null == sortDataRows) {
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ LOGGER.info("Number of Records was Zero");
+ String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+ LOGGER.info(logMessage);
+ return false;
+ }
+
+ try {
+ // start sorting
+ sortDataRows.startSorting();
+
+ // check any more rows are present
+ LOGGER.info("Record Processed For table: " + parameters.getTableName());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
+ System.currentTimeMillis());
+ return false;
+ } catch (CarbonSortKeyAndGroupByException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ /**
* This thread iterates the iterator and adds the rows to @{@link SortDataRows}
*/
private static class SortIteratorThread implements Callable<Void> {
@@ -161,11 +187,14 @@ public class ParallelReadMergeSorterImpl implements Sorter {
private SortParameters parameters;
+ private Object[][] buffer;
+
public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows,
- SortParameters parameters) {
+ SortParameters parameters, int batchSize) {
this.iterator = iterator;
this.sortDataRows = sortDataRows;
this.parameters = parameters;
+ this.buffer = new Object[batchSize][];
}
@Override
@@ -174,15 +203,17 @@ public class ParallelReadMergeSorterImpl implements Sorter {
while (iterator.hasNext()) {
CarbonRowBatch batch = iterator.next();
Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
+ int i = 0;
while (batchIterator.hasNext()) {
CarbonRow row = batchIterator.next();
if (row != null) {
- sortDataRows.addRow(row.getData());
+ buffer[i++] = row.getData();
}
}
+ if (i > 0) {
+ sortDataRows.addRowBatch(buffer, i);
+ }
}
-
- processRowToNextStep(sortDataRows);
} catch (Exception e) {
LOGGER.error(e);
throw new CarbonDataLoadingException(e);
@@ -190,34 +221,5 @@ public class ParallelReadMergeSorterImpl implements Sorter {
return null;
}
- /**
- * Below method will be used to process data to next step
- */
- private boolean processRowToNextStep(SortDataRows sortDataRows)
- throws CarbonDataLoadingException {
- if (null == sortDataRows) {
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- LOGGER.info("Number of Records was Zero");
- String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
- LOGGER.info(logMessage);
- return false;
- }
-
- try {
- // start sorting
- sortDataRows.startSorting();
-
- // check any more rows are present
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
- System.currentTimeMillis());
- return false;
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataLoadingException(e);
- }
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
index 69bd84a..b979af6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
@@ -3,6 +3,11 @@ package org.apache.carbondata.processing.newflow.steps;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
@@ -27,33 +32,35 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
private RowParser rowParser;
- private Iterator<Object[]>[] inputIterators;
+ private CarbonIterator<Object[]>[] inputIterators;
+
+ /**
+ * executor service to execute the query
+ */
+ public ExecutorService executorService;
public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
- Iterator<Object[]>[] inputIterators) {
+ CarbonIterator<Object[]>[] inputIterators) {
super(configuration, null);
this.inputIterators = inputIterators;
}
- @Override
- public DataField[] getOutput() {
+ @Override public DataField[] getOutput() {
return configuration.getDataFields();
}
- @Override
- public void initialize() throws CarbonDataLoadingException {
+ @Override public void initialize() throws CarbonDataLoadingException {
rowParser = new RowParserImpl(getOutput(), configuration);
+ executorService = Executors.newCachedThreadPool();
}
-
-
- @Override
- public Iterator<CarbonRowBatch>[] execute() {
+ @Override public Iterator<CarbonRowBatch>[] execute() {
int batchSize = CarbonProperties.getInstance().getBatchSize();
- List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+ List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
for (int i = 0; i < outIterators.length; i++) {
- outIterators[i] = new InputProcessorIterator(readerIterators[i], rowParser, batchSize);
+ outIterators[i] =
+ new InputProcessorIterator(readerIterators[i], rowParser, batchSize, executorService);
}
return outIterators;
}
@@ -62,14 +69,14 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
* Partition input iterators equally as per the number of threads.
* @return
*/
- private List<Iterator<Object[]>>[] partitionInputReaderIterators() {
+ private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
// Get the number of cores configured in property.
int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
// Get the minimum of number of cores and iterators size to get the number of parallel threads
// to be launched.
int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
- List<Iterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+ List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
for (int i = 0; i < parallelThreadNumber; i++) {
iterators[i] = new ArrayList<>();
}
@@ -80,20 +87,26 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
return iterators;
}
- @Override
- protected CarbonRow processRow(CarbonRow row) {
+ @Override protected CarbonRow processRow(CarbonRow row) {
return null;
}
+ @Override public void close() {
+ executorService.shutdown();
+ for (CarbonIterator inputIterator : inputIterators) {
+ inputIterator.close();
+ }
+ }
+
/**
* This iterator wraps the list of iterators and it starts iterating the each
* iterator of the list one by one. It also parse the data while iterating it.
*/
private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
- private List<Iterator<Object[]>> inputIterators;
+ private List<CarbonIterator<Object[]>> inputIterators;
- private Iterator<Object[]> currentIterator;
+ private CarbonIterator<Object[]> currentIterator;
private int counter;
@@ -101,19 +114,28 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
private RowParser rowParser;
- public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
- RowParser rowParser, int batchSize) {
+ private Future<CarbonRowBatch> future;
+
+ private ExecutorService executorService;
+
+ private boolean nextBatch;
+
+ public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators,
+ RowParser rowParser, int batchSize, ExecutorService executorService) {
this.inputIterators = inputIterators;
this.batchSize = batchSize;
this.rowParser = rowParser;
this.counter = 0;
// Get the first iterator from the list.
currentIterator = inputIterators.get(counter++);
+ currentIterator.initialize();
+ this.executorService = executorService;
+ this.nextBatch = false;
}
@Override
public boolean hasNext() {
- return internalHasNext();
+ return nextBatch || internalHasNext();
}
private boolean internalHasNext() {
@@ -124,6 +146,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
if (counter < inputIterators.size()) {
// Get the next iterator from the list.
currentIterator = inputIterators.get(counter++);
+ // Initialize the new iterator
+ currentIterator.initialize();
hasNext = internalHasNext();
}
}
@@ -132,14 +156,39 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
@Override
public CarbonRowBatch next() {
- // Create batch and fill it.
- CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
- int count = 0;
- while (internalHasNext() && count < batchSize) {
- carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
- count++;
+ CarbonRowBatch result = null;
+ if (future == null) {
+ future = getCarbonRowBatch();
+ }
+ try {
+ result = future.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ nextBatch = false;
+ if (hasNext()) {
+ nextBatch = true;
+ future = getCarbonRowBatch();
}
- return carbonRowBatch;
+
+ return result;
+ }
+
+ private Future<CarbonRowBatch> getCarbonRowBatch() {
+ return executorService.submit(new Callable<CarbonRowBatch>() {
+ @Override public CarbonRowBatch call() throws Exception {
+ // Create batch and fill it.
+ CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
+ int count = 0;
+ while (internalHasNext() && count < batchSize) {
+ carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
+ count++;
+ }
+ return carbonRowBatch;
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 3a6afc7..7231775 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -74,6 +74,8 @@ public class SortDataRows {
private SortIntermediateFileMerger intermediateFileMerger;
+ private final Object addRowsLock = new Object();
+
public SortDataRows(SortParameters parameters,
SortIntermediateFileMerger intermediateFileMerger) {
this.parameters = parameters;
@@ -137,6 +139,44 @@ public class SortDataRows {
}
/**
+ * This method will be used to add new row
+ *
+ * @param rowBatch new rowBatch
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
+ // if record holder list size is equal to sort buffer size then it will
+ // sort the list and then write current list data to file
+ synchronized (addRowsLock) {
+ if (entryCount + size >= sortBufferSize) {
+ LOGGER.debug("************ Writing to temp file ********** ");
+ intermediateFileMerger.startMergingIfPossible();
+ Object[][] recordHolderListLocal = recordHolderList;
+ int sizeLeft = sortBufferSize - entryCount ;
+ if (sizeLeft > 0) {
+ System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
+ }
+ try {
+ dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(recordHolderListLocal));
+ } catch (Exception e) {
+ LOGGER.error(
+ "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
+ throw new CarbonSortKeyAndGroupByException(e);
+ }
+ // create the new holder Array
+ this.recordHolderList = new Object[this.sortBufferSize][];
+ this.entryCount = 0;
+ size = size - sizeLeft;
+ if (size == 0) {
+ return;
+ }
+ }
+ System.arraycopy(rowBatch, 0, recordHolderList, entryCount, size);
+ entryCount += size;
+ }
+ }
+
+ /**
* Below method will be used to start storing process This method will get
* all the temp files present in sort temp folder then it will create the
* record holder heap and then it will read first record from each file and
[2/2] incubator-carbondata git commit: [CARBONDATA-471]Optimized no
kettle flow and fixed issues in cluster This closes #333
Posted by ja...@apache.org.
[CARBONDATA-471]Optimized no kettle flow and fixed issues in cluster This closes #333
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e7e370ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e7e370ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e7e370ca
Branch: refs/heads/master
Commit: e7e370cac28d3db35c8e5ec6ab4d3fb62af749f4
Parents: f47bbc2 63434fa
Author: jackylk <ja...@huawei.com>
Authored: Fri Dec 2 16:08:14 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Dec 2 16:08:14 2016 +0800
----------------------------------------------------------------------
.../carbondata/common/CarbonIterator.java | 14 +++
.../AbstractDetailQueryResultIterator.java | 2 +-
.../carbondata/hadoop/csv/CSVInputFormat.java | 42 +++++---
.../recorditerator/RecordReaderIterator.java | 31 +++++-
.../spark/rdd/NewCarbonDataLoadRDD.scala | 40 ++++---
.../processing/iterator/CarbonIterator.java | 38 -------
.../processing/newflow/DataLoadExecutor.java | 5 +-
.../newflow/DataLoadProcessBuilder.java | 8 +-
.../sort/impl/ParallelReadMergeSorterImpl.java | 90 ++++++++--------
.../newflow/steps/InputProcessorStepImpl.java | 105 ++++++++++++++-----
.../sortandgroupby/sortdata/SortDataRows.java | 40 +++++++
11 files changed, 268 insertions(+), 147 deletions(-)
----------------------------------------------------------------------