You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/10 14:49:24 UTC
[2/5] incubator-carbondata git commit: Data load integration of all
steps for removing kettle
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 78b9290..e0a7ceb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -30,10 +30,12 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.DataField;
import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.newflow.converter.FieldConverter;
import org.apache.carbondata.processing.newflow.converter.RowConverter;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
/**
* It converts the complete row if necessary, dictionary columns are encoded with dictionary values
@@ -43,14 +45,30 @@ public class RowConverterImpl implements RowConverter {
private CarbonDataLoadConfiguration configuration;
+ private DataField[] fields;
+
private FieldConverter[] fieldConverters;
- public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration) {
+ private BadRecordsLogger badRecordLogger;
+
+ private BadRecordLogHolder logHolder;
+
+ public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
+ BadRecordsLogger badRecordLogger) {
+ this.fields = fields;
this.configuration = configuration;
+ this.badRecordLogger = badRecordLogger;
+ }
+
+ @Override
+ public void initialize() {
CacheProvider cacheProvider = CacheProvider.getInstance();
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache =
cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
configuration.getTableIdentifier().getStorePath());
+ String nullFormat =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+ .toString();
List<FieldConverter> fieldConverterList = new ArrayList<>();
long lruCacheStartTime = System.currentTimeMillis();
@@ -58,20 +76,27 @@ public class RowConverterImpl implements RowConverter {
for (int i = 0; i < fields.length; i++) {
FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
.createFieldEncoder(fields[i], cache,
- configuration.getTableIdentifier().getCarbonTableIdentifier(), i);
- if (fieldConverter != null) {
- fieldConverterList.add(fieldConverter);
- }
+ configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat);
+ fieldConverterList.add(fieldConverter);
}
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
.recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
+ logHolder = new BadRecordLogHolder();
}
@Override
public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
+ CarbonRow copy = row.getCopy();
for (int i = 0; i < fieldConverters.length; i++) {
- fieldConverters[i].convert(row);
+ fieldConverters[i].convert(row, logHolder);
+ if (logHolder.isBadRecordNotAdded()) {
+ badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
+ logHolder.clear();
+ if(badRecordLogger.isBadRecordConvertNullDisable()) {
+ return null;
+ }
+ }
}
return row;
}
@@ -81,8 +106,8 @@ public class RowConverterImpl implements RowConverter {
List<Integer> dimCardinality = new ArrayList<>();
for (int i = 0; i < fieldConverters.length; i++) {
if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
- dimCardinality.add(
- ((AbstractDictionaryFieldConverterImpl) fieldConverters[i]).getColumnCardinality());
+ ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
+ .fillColumnCardinality(dimCardinality);
}
}
int[] cardinality = new int[dimCardinality.size()];
@@ -93,4 +118,13 @@ public class RowConverterImpl implements RowConverter {
configuration.setDataLoadProperty(DataLoadProcessorConstants.DIMENSION_LENGTHS, cardinality);
}
+ @Override
+ public RowConverter createCopyForNewThread() {
+ RowConverterImpl converter =
+ new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger);
+ converter.fieldConverters = fieldConverters;
+ converter.logHolder = new BadRecordLogHolder();
+ return converter;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
new file mode 100644
index 0000000..0c15c4b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.dictionary;
+
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+
+/**
+ * It is used for generating dictionary from value itself, like timestamp can be used directly as
+ * dictionary.
+ */
+public class DirectDictionary implements BiDictionary<Integer, Object> {
+
+ private DirectDictionaryGenerator dictionaryGenerator;
+
+ public DirectDictionary(DirectDictionaryGenerator dictionaryGenerator) {
+ this.dictionaryGenerator = dictionaryGenerator;
+ }
+
+ @Override
+ public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
+ Integer key = getKey(value);
+ if (key == null) {
+ throw new UnsupportedOperationException("trying to add new entry in DirectDictionary");
+ }
+ return key;
+ }
+
+ @Override
+ public Integer getKey(Object value) {
+ return dictionaryGenerator.generateDirectSurrogateKey(value.toString());
+ }
+
+ @Override
+ public Object getValue(Integer key) {
+ return dictionaryGenerator.getValueFromSurrogate(key);
+ }
+
+ @Override public int size() {
+ return Integer.MAX_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
index f807a81..3f0a9f0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
@@ -23,7 +23,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-public class PreCreatedDictionary implements BiDictionary<Integer, String> {
+public class PreCreatedDictionary implements BiDictionary<Integer, Object> {
private Dictionary dictionary;
@@ -32,7 +32,7 @@ public class PreCreatedDictionary implements BiDictionary<Integer, String> {
}
@Override
- public Integer getOrGenerateKey(String value) throws DictionaryGenerationException {
+ public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
Integer key = getKey(value);
if (key == null) {
throw new UnsupportedOperationException("trying to add new entry in PreCreatedDictionary");
@@ -41,8 +41,8 @@ public class PreCreatedDictionary implements BiDictionary<Integer, String> {
}
@Override
- public Integer getKey(String value) {
- return dictionary.getSurrogateKey(value);
+ public Integer getKey(Object value) {
+ return dictionary.getSurrogateKey(value.toString());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
new file mode 100644
index 0000000..7c1126e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.exception;
+
+public class BadRecordFoundException extends Exception {
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public BadRecordFoundException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public BadRecordFoundException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param t
+ */
+ public BadRecordFoundException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
index fcf122e..a0139f5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
@@ -30,42 +30,47 @@ public final class CarbonParserFactory {
/**
* Create parser for the carbon column.
+ *
* @param carbonColumn
* @param complexDelimiters
* @return
*/
- public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters) {
- return createParser(carbonColumn, complexDelimiters, 0);
+ public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
+ String nullFormat) {
+ return createParser(carbonColumn, complexDelimiters, nullFormat, 0);
}
/**
* This method may be called recursively if the carbon column is complex type.
+ *
* @param carbonColumn
* @param complexDelimiters, these delimiters which are used to separate the complex data types.
- * @param depth It is like depth of tree, if column has children then depth is 1, And depth
- * becomes 2 if children has children. This depth is used select the complex
- * delimiters
+ * @param depth It is like depth of tree, if column has children then depth is 1,
+ * And depth becomes 2 if children has children.
+ * This depth is used select the complex
+ * delimiters
* @return GenericParser
*/
private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
- int depth) {
+ String nullFormat, int depth) {
switch (carbonColumn.getDataType()) {
case ARRAY:
List<CarbonDimension> listOfChildDimensions =
((CarbonDimension) carbonColumn).getListOfChildDimensions();
// Create array parser with complex delimiter
- ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth]);
+ ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat);
for (CarbonDimension dimension : listOfChildDimensions) {
- arrayParser.addChildren(createParser(dimension, complexDelimiters, depth + 1));
+ arrayParser
+ .addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
}
return arrayParser;
case STRUCT:
List<CarbonDimension> dimensions =
((CarbonDimension) carbonColumn).getListOfChildDimensions();
// Create struct parser with complex delimiter
- StructParserImpl parser = new StructParserImpl(complexDelimiters[depth]);
+ StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat);
for (CarbonDimension dimension : dimensions) {
- parser.addChildren(createParser(dimension, complexDelimiters, depth + 1));
+ parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
}
return parser;
case MAP:
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
index bdc13ab..e500bf9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
@@ -22,7 +22,7 @@ package org.apache.carbondata.processing.newflow.parser;
* Parse the data according to implementation, The implementation classes can be struct, array or
* map datatypes.
* It remains thread safe as the state of implementation class should not change while
- * calling @{@link GenericParser#parse(String)} method
+ * calling @{@link GenericParser#parse(Object)} method
*/
public interface GenericParser<E> {
@@ -31,6 +31,6 @@ public interface GenericParser<E> {
* @param data
* @return
*/
- E parse(String data);
+ E parse(Object data);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
index 7fc95e7..93ee4d4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
@@ -18,8 +18,6 @@
*/
package org.apache.carbondata.processing.newflow.parser.impl;
-import java.util.ArrayList;
-import java.util.List;
import java.util.regex.Pattern;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -28,33 +26,38 @@ import org.apache.carbondata.processing.newflow.parser.ComplexParser;
import org.apache.carbondata.processing.newflow.parser.GenericParser;
import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
/**
* It parses the string to @{@link ArrayObject} using delimiter.
* It is thread safe as the state of class don't change while
- * calling @{@link GenericParser#parse(String)} method
+ * calling @{@link GenericParser#parse(Object)} method
*/
public class ArrayParserImpl implements ComplexParser<ArrayObject> {
private Pattern pattern;
- private List<GenericParser> children = new ArrayList<>();
+ private GenericParser child;
- public ArrayParserImpl(String delimiter) {
+ private String nullFormat;
+
+ public ArrayParserImpl(String delimiter, String nullFormat) {
pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
+ this.nullFormat = nullFormat;
}
@Override
- public ArrayObject parse(String data) {
- if (StringUtils.isNotEmpty(data)) {
- String[] split = pattern.split(data, -1);
- if (ArrayUtils.isNotEmpty(split)) {
- Object[] array = new Object[children.size()];
- for (int i = 0; i < children.size(); i++) {
- array[i] = children.get(i).parse(split[i]);
+ public ArrayObject parse(Object data) {
+ if (data != null) {
+ String value = data.toString();
+ if (!value.isEmpty() && !value.equals(nullFormat)) {
+ String[] split = pattern.split(value, -1);
+ if (ArrayUtils.isNotEmpty(split)) {
+ Object[] array = new Object[split.length];
+ for (int i = 0; i < split.length; i++) {
+ array[i] = child.parse(split[i]);
+ }
+ return new ArrayObject(array);
}
- return new ArrayObject(array);
}
}
return null;
@@ -62,6 +65,6 @@ public class ArrayParserImpl implements ComplexParser<ArrayObject> {
@Override
public void addChildren(GenericParser parser) {
- children.add(parser);
+ child = parser;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
index a7c08b5..a9940ae 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
@@ -23,7 +23,7 @@ import org.apache.carbondata.processing.newflow.parser.GenericParser;
public class PrimitiveParserImpl implements GenericParser<Object> {
@Override
- public Object parse(String data) {
+ public Object parse(Object data) {
return data;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
index 9c1edd1..c438f89 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
@@ -18,6 +18,10 @@
*/
package org.apache.carbondata.processing.newflow.parser.impl;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
import org.apache.carbondata.processing.newflow.parser.GenericParser;
import org.apache.carbondata.processing.newflow.parser.RowParser;
@@ -25,15 +29,69 @@ public class RowParserImpl implements RowParser {
private GenericParser[] genericParsers;
- public RowParserImpl(GenericParser[] genericParsers) {
- this.genericParsers = genericParsers;
+ private int[] outputMapping;
+
+ private int[] inputMapping;
+
+ private int numberOfColumns;
+
+ public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration configuration) {
+ String[] complexDelimiters =
+ (String[]) configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS);
+ String nullFormat =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+ .toString();
+ DataField[] input = getInput(configuration);
+ genericParsers = new GenericParser[input.length];
+ for (int i = 0; i < genericParsers.length; i++) {
+ genericParsers[i] =
+ CarbonParserFactory.createParser(input[i].getColumn(), complexDelimiters, nullFormat);
+ }
+ outputMapping = new int[output.length];
+ for (int i = 0; i < input.length; i++) {
+ for (int j = 0; j < output.length; j++) {
+ if (input[i].getColumn().equals(output[j].getColumn())) {
+ outputMapping[i] = j;
+ break;
+ }
+ }
+ }
+ }
+
+ public DataField[] getInput(CarbonDataLoadConfiguration configuration) {
+ DataField[] fields = configuration.getDataFields();
+ String[] header = configuration.getHeader();
+ numberOfColumns = header.length;
+ DataField[] input = new DataField[fields.length];
+ inputMapping = new int[input.length];
+ int k = 0;
+ for (int i = 0; i < numberOfColumns; i++) {
+ for (int j = 0; j < fields.length; j++) {
+ if (header[i].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+ input[k] = fields[j];
+ inputMapping[k] = i;
+ k++;
+ break;
+ }
+ }
+ }
+ return input;
}
@Override
public Object[] parseRow(Object[] row) {
- for (int i = 0; i < row.length; i++) {
- row[i] = genericParsers[i].parse(row[i].toString());
+ // If number of columns are less in a row then create new array with same size of header.
+ if (row.length < numberOfColumns) {
+ String[] temp = new String[numberOfColumns];
+ System.arraycopy(row, 0, temp, 0, row.length);
+ row = temp;
}
- return row;
+ Object[] out = new Object[genericParsers.length];
+ for (int i = 0; i < genericParsers.length; i++) {
+ Object obj = row[inputMapping[i]];
+ out[outputMapping[i]] = genericParsers[i].parse(obj);
+ }
+ return out;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
index b38a2e4..e83c079 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
@@ -28,12 +28,11 @@ import org.apache.carbondata.processing.newflow.parser.ComplexParser;
import org.apache.carbondata.processing.newflow.parser.GenericParser;
import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
/**
* It parses the string to @{@link StructObject} using delimiter.
* It is thread safe as the state of class don't change while
- * calling @{@link GenericParser#parse(String)} method
+ * calling @{@link GenericParser#parse(Object)} method
*/
public class StructParserImpl implements ComplexParser<StructObject> {
@@ -41,20 +40,26 @@ public class StructParserImpl implements ComplexParser<StructObject> {
private List<GenericParser> children = new ArrayList<>();
- public StructParserImpl(String delimiter) {
+ private String nullFormat;
+
+ public StructParserImpl(String delimiter, String nullFormat) {
pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
+ this.nullFormat = nullFormat;
}
@Override
- public StructObject parse(String data) {
- if (StringUtils.isNotEmpty(data)) {
- String[] split = pattern.split(data, -1);
- if (ArrayUtils.isNotEmpty(split)) {
- Object[] array = new Object[children.size()];
- for (int i = 0; i < children.size(); i++) {
- array[i] = children.get(i).parse(split[i]);
+ public StructObject parse(Object data) {
+ if (data != null) {
+ String value = data.toString();
+ if (!value.isEmpty() && !value.equals(nullFormat)) {
+ String[] split = pattern.split(value, -1);
+ if (ArrayUtils.isNotEmpty(split)) {
+ Object[] array = new Object[children.size()];
+ for (int i = 0; i < split.length && i < array.length; i++) {
+ array[i] = children.get(i).parse(split[i]);
+ }
+ return new StructObject(array);
}
- return new StructObject(array);
}
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
index 83b07c9..68b87a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
@@ -20,6 +20,7 @@
package org.apache.carbondata.processing.newflow.row;
import java.math.BigDecimal;
+import java.util.Arrays;
/**
* This row class is used to transfer the row data from one step to other step
@@ -36,6 +37,10 @@ public class CarbonRow {
return data;
}
+ public void setData(Object[] data) {
+ this.data = data;
+ }
+
public int getInt(int ordinal) {
return (int) data[ordinal];
}
@@ -72,12 +77,21 @@ public class CarbonRow {
return (Object[]) data[ordinal];
}
- public Integer[] getIntegerArray(int ordinal) {
- return (Integer[]) data[ordinal];
+ public int[] getIntArray(int ordinal) {
+ return (int[]) data[ordinal];
}
public void update(Object value, int ordinal) {
data[ordinal] = value;
}
+ public CarbonRow getCopy() {
+ Object[] copy = new Object[data.length];
+ System.arraycopy(data, 0, copy, 0, copy.length);
+ return new CarbonRow(copy);
+ }
+
+ @Override public String toString() {
+ return Arrays.toString(data);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 0e575bc..cd487ec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -81,10 +81,10 @@ public class ParallelReadMergeSorterImpl implements Sorter {
storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
finalMerger =
new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
- sortParameters.getDimColCount() - sortParameters.getComplexDimColCount(),
+ sortParameters.getDimColCount(),
sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
- sortParameters.getNoDictionaryDimnesionColumn());
+ sortParameters.getNoDictionaryDimnesionColumn(), sortParameters.isUseKettle());
}
@Override
@@ -102,21 +102,14 @@ public class ParallelReadMergeSorterImpl implements Sorter {
}
this.executorService = Executors.newFixedThreadPool(iterators.length);
- // First prepare the data for sort.
- Iterator<CarbonRowBatch>[] sortPrepIterators = new Iterator[iterators.length];
- for (int i = 0; i < sortPrepIterators.length; i++) {
- sortPrepIterators[i] = new SortPreparatorIterator(iterators[i], inputDataFields);
- }
-
- for (int i = 0; i < sortDataRows.length; i++) {
- executorService
- .submit(new SortIteratorThread(sortPrepIterators[i], sortDataRows[i], sortParameters));
- }
-
try {
+ for (int i = 0; i < sortDataRows.length; i++) {
+ executorService.submit(
+ new SortIteratorThread(iterators[i], sortDataRows[i], sortParameters));
+ }
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.DAYS);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
}
try {
@@ -182,12 +175,15 @@ public class ParallelReadMergeSorterImpl implements Sorter {
CarbonRowBatch batch = iterator.next();
Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
while (batchIterator.hasNext()) {
- sortDataRows.addRow(batchIterator.next().getData());
+ CarbonRow row = batchIterator.next();
+ if (row != null) {
+ sortDataRows.addRow(row.getData());
+ }
}
}
processRowToNextStep(sortDataRows);
- } catch (CarbonSortKeyAndGroupByException e) {
+ } catch (Exception e) {
LOGGER.error(e);
throw new CarbonDataLoadingException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
deleted file mode 100644
index 9c4305a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.constants.IgnoreDictionary;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
-
-import org.apache.commons.lang3.ArrayUtils;
-
-/**
- * This iterator transform the row to how carbon sorter interface expects it.
- * TODO : It supposed to return a comparable ROW which can sort the row.
- */
-public class SortPreparatorIterator extends CarbonIterator<CarbonRowBatch> {
-
- private Iterator<CarbonRowBatch> iterator;
-
- private int[] dictionaryFieldIndexes;
-
- private int[] nonDictionaryFieldIndexes;
-
- private int[] measueFieldIndexes;
-
- private int dimIndexInRow = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
-
- private int byteArrayIndexInRow = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
-
- private int measureIndexInRow = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
-
- public SortPreparatorIterator(Iterator<CarbonRowBatch> iterator, DataField[] dataFields) {
- this.iterator = iterator;
- List<Integer> dictIndexes = new ArrayList<>();
- List<Integer> nonDictIndexes = new ArrayList<>();
- List<Integer> msrIndexes = new ArrayList<>();
- for (int i = 0; i < dataFields.length; i++) {
- if (dataFields[i].getColumn().isDimesion()) {
- if (dataFields[i].hasDictionaryEncoding()) {
- dictIndexes.add(i);
- } else {
- nonDictIndexes.add(i);
- }
- } else {
- msrIndexes.add(i);
- }
- }
- dictionaryFieldIndexes =
- ArrayUtils.toPrimitive(dictIndexes.toArray(new Integer[dictIndexes.size()]));
- nonDictionaryFieldIndexes =
- ArrayUtils.toPrimitive(nonDictIndexes.toArray(new Integer[nonDictIndexes.size()]));
- measueFieldIndexes = ArrayUtils.toPrimitive(msrIndexes.toArray(new Integer[msrIndexes.size()]));
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public CarbonRowBatch next() {
- CarbonRowBatch batch = iterator.next();
- Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
- while (batchIterator.hasNext()) {
- Object[] outputArray = new Object[3];
- CarbonRow row = batchIterator.next();
- fillDictionaryArrayFromRow(row, outputArray);
- fillNonDictionaryArrayFromRow(row, outputArray);
- fillMeasureArrayFromRow(row, outputArray);
- }
- return batch;
- }
-
- /**
- * Collect all dictionary values to single integer array and store it in 0 index of out put array.
- *
- * @param row
- * @param outputArray
- */
- private void fillDictionaryArrayFromRow(CarbonRow row, Object[] outputArray) {
- if (dictionaryFieldIndexes.length > 0) {
- int[] dictArray = new int[dictionaryFieldIndexes.length];
- for (int i = 0; i < dictionaryFieldIndexes.length; i++) {
- dictArray[i] = row.getInt(dictionaryFieldIndexes[i]);
- }
- outputArray[dimIndexInRow] = dictArray;
- }
- }
-
- /**
- * collect all non dictionary columns and compose it to single byte array and store it in 1 index
- * of out put array
- *
- * @param row
- * @param outputArray
- */
- private void fillNonDictionaryArrayFromRow(CarbonRow row, Object[] outputArray) {
- if (nonDictionaryFieldIndexes.length > 0) {
- byte[][] nonDictByteArray = new byte[nonDictionaryFieldIndexes.length][];
- for (int i = 0; i < nonDictByteArray.length; i++) {
- nonDictByteArray[i] = row.getBinary(nonDictionaryFieldIndexes[i]);
- }
-
- byte[] nonDictionaryCols =
- RemoveDictionaryUtil.packByteBufferIntoSingleByteArray(nonDictByteArray);
- outputArray[byteArrayIndexInRow] = nonDictionaryCols;
- }
- }
-
- /**
- * Collect all measure values as array and store it in 2 index of out put array.
- *
- * @param row
- * @param outputArray
- */
- private void fillMeasureArrayFromRow(CarbonRow row, Object[] outputArray) {
- if (measueFieldIndexes.length > 0) {
- Object[] measureArray = new Object[measueFieldIndexes.length];
- for (int i = 0; i < measueFieldIndexes.length; i++) {
- measureArray[i] = row.getObject(measueFieldIndexes[i]);
- }
- outputArray[measureIndexInRow] = measureArray;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 582b8c1..1c59d29 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -19,13 +19,24 @@
package org.apache.carbondata.processing.newflow.steps;
+import java.io.File;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.constants.LoggerAction;
import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.newflow.converter.RowConverter;
import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
/**
* Replace row data fields with dictionary values if column is configured dictionary encoded.
@@ -33,7 +44,7 @@ import org.apache.carbondata.processing.newflow.row.CarbonRow;
*/
public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
- private RowConverter encoder;
+ private RowConverter converter;
public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
AbstractDataLoadProcessorStep child) {
@@ -47,20 +58,105 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
@Override
public void initialize() throws CarbonDataLoadingException {
- encoder = new RowConverterImpl(child.getOutput(), configuration);
child.initialize();
+ BadRecordsLogger badRecordLogger = createBadRecordLogger();
+ converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+ converter.initialize();
+ }
+
+ /**
+ * Create the iterator using child iterator.
+ *
+ * @param childIter
+ * @return new iterator with step specific processing.
+ */
+ @Override
+ protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+ return new CarbonIterator<CarbonRowBatch>() {
+ RowConverter localConverter = converter.createCopyForNewThread();
+ @Override public boolean hasNext() {
+ return childIter.hasNext();
+ }
+
+ @Override public CarbonRowBatch next() {
+ return processRowBatch(childIter.next(), localConverter);
+ }
+ };
+ }
+
+ /**
+ * Process the batch of rows as per the step logic.
+ *
+ * @param rowBatch
+ * @return processed row.
+ */
+ protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
+ CarbonRowBatch newBatch = new CarbonRowBatch();
+ Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
+ while (batchIterator.hasNext()) {
+ newBatch.addRow(localConverter.convert(batchIterator.next()));
+ }
+ return newBatch;
}
@Override
protected CarbonRow processRow(CarbonRow row) {
- return encoder.convert(row);
+ throw new UnsupportedOperationException();
+ }
+
+ private BadRecordsLogger createBadRecordLogger() {
+ boolean badRecordsLogRedirect = false;
+ boolean badRecordConvertNullDisable = false;
+ boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+ .toString());
+ Object bad_records_action =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+ .toString();
+ if (null != bad_records_action) {
+ LoggerAction loggerAction = null;
+ try {
+ loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ loggerAction = LoggerAction.FORCE;
+ }
+ switch (loggerAction) {
+ case FORCE:
+ badRecordConvertNullDisable = false;
+ break;
+ case REDIRECT:
+ badRecordsLogRedirect = true;
+ badRecordConvertNullDisable = true;
+ break;
+ case IGNORE:
+ badRecordsLogRedirect = false;
+ badRecordConvertNullDisable = true;
+ break;
+ }
+ }
+ CarbonTableIdentifier identifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ BadRecordsLogger badRecordsLogger = new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
+ identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
+ identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator
+ + configuration.getTaskNo()), badRecordsLogRedirect, badRecordsLoggerEnable,
+ badRecordConvertNullDisable);
+ return badRecordsLogger;
+ }
+
+ private String getBadLogStoreLocation(String storeLocation) {
+ String badLogStoreLocation =
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+ badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+ return badLogStoreLocation;
}
@Override
public void close() {
super.close();
- if (encoder != null) {
- encoder.finish();
+ if (converter != null) {
+ converter.finish();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
new file mode 100644
index 0000000..48492d3
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.io.File;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.constants.IgnoreDictionary;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from sorted files which are generated in previous sort step.
+ * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
+ */
+public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName());
+
+ private SegmentProperties segmentProperties;
+
+ private KeyGenerator keyGenerator;
+
+ private CarbonFactHandler dataHandler;
+
+ private int noDictionaryCount;
+
+ private int complexDimensionCount;
+
+ private int measureCount;
+
+ private long readCounter;
+
+ private long writeCounter;
+
+ private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
+
+ private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
+
+ private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
+
+ private String storeLocation;
+
+ public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+ AbstractDataLoadProcessorStep child) {
+ super(configuration, child);
+ }
+
+ @Override
+ public DataField[] getOutput() {
+ return child.getOutput();
+ }
+
+ @Override
+ public void initialize() throws CarbonDataLoadingException {
+ child.initialize();
+ CarbonTableIdentifier tableIdentifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+
+ storeLocation = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+ tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
+ configuration.getPartitionId(), configuration.getSegmentId() + "", false);
+
+ if (!(new File(storeLocation).mkdirs())) {
+ LOGGER.error("Local data load folder location does not exist: " + storeLocation);
+ return;
+ }
+ }
+
+ @Override
+ public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+ Iterator<CarbonRowBatch>[] iterators = child.execute();
+ String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
+ try {
+ CarbonFactDataHandlerModel dataHandlerModel =
+ CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration, storeLocation);
+ noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
+ complexDimensionCount = configuration.getComplexDimensionCount();
+ measureCount = dataHandlerModel.getMeasureCount();
+ segmentProperties = dataHandlerModel.getSegmentProperties();
+ keyGenerator = segmentProperties.getDimensionKeyGenerator();
+ dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel,
+ CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+ dataHandler.initialise();
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ System.currentTimeMillis());
+ for (Iterator<CarbonRowBatch> iterator : iterators) {
+ while (iterator.hasNext()) {
+ processBatch(iterator.next());
+ }
+ }
+
+ } catch (CarbonDataWriterException e) {
+ LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
+ throw new CarbonDataLoadingException(
+ "Error while initializing data handler : " + e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
+ throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+ String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
+ try {
+ dataHandler.finish();
+ } catch (Exception e) {
+ LOGGER.error(e, "Failed for table: " + tableName + " in finishing data handler");
+ }
+ LOGGER.info("Record Processed For table: " + tableName);
+ String logMessage =
+ "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
+ + writeCounter;
+ LOGGER.info(logMessage);
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter);
+ processingComplete();
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+ }
+
+ private void processingComplete() throws CarbonDataLoadingException {
+ if (null != dataHandler) {
+ try {
+ dataHandler.closeHandler();
+ } catch (CarbonDataWriterException e) {
+ LOGGER.error(e, e.getMessage());
+ throw new CarbonDataLoadingException(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error(e, e.getMessage());
+ throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+ }
+ }
+ }
+
+ private void processBatch(CarbonRowBatch batch) throws CarbonDataLoadingException {
+ Iterator<CarbonRow> iterator = batch.getBatchIterator();
+ try {
+ while (iterator.hasNext()) {
+ CarbonRow row = iterator.next();
+ readCounter++;
+ Object[] outputRow;
+ // adding one for the high cardinality dims byte array.
+ if (noDictionaryCount > 0 || complexDimensionCount > 0) {
+ outputRow = new Object[measureCount + 1 + 1];
+ } else {
+ outputRow = new Object[measureCount + 1];
+ }
+
+ int l = 0;
+ int index = 0;
+ Object[] measures = row.getObjectArray(measureIndex);
+ for (int i = 0; i < measureCount; i++) {
+ outputRow[l++] = measures[index++];
+ }
+ outputRow[l] = row.getObject(noDimByteArrayIndex);
+
+ int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
+ int[] dimsArray = row.getIntArray(dimsArrayIndex);
+ for (int i = 0; i < highCardExcludedRows.length; i++) {
+ highCardExcludedRows[i] = dimsArray[i];
+ }
+
+ outputRow[outputRow.length - 1] = keyGenerator.generateKey(highCardExcludedRows);
+ dataHandler.addDataToStore(outputRow);
+ writeCounter++;
+ }
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException("unable to generate the mdkey", e);
+ }
+ }
+
+ @Override
+ protected CarbonRow processRow(CarbonRow row) {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
new file mode 100644
index 0000000..044665c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
@@ -0,0 +1,84 @@
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * DummyClassForTest
+ */
+public class DummyClassForTest extends AbstractDataLoadProcessorStep {
+
+ private ExecutorService executorService;
+
+ public DummyClassForTest(CarbonDataLoadConfiguration configuration,
+ AbstractDataLoadProcessorStep child) {
+ super(configuration, child);
+ }
+
+ @Override public DataField[] getOutput() {
+ return child.getOutput();
+ }
+
+ @Override public void initialize() throws CarbonDataLoadingException {
+
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+ Iterator<CarbonRowBatch>[] iterators = child.execute();
+ this.executorService = Executors.newFixedThreadPool(iterators.length);
+
+ try {
+ for (int i = 0; i < iterators.length; i++) {
+ executorService.submit(new DummyThread(iterators[i]));
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.DAYS);
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+ }
+ return null;
+ }
+
+ @Override protected CarbonRow processRow(CarbonRow row) {
+ return null;
+ }
+}
+
+/**
+ * This thread iterates the iterator
+ */
+class DummyThread implements Callable<Void> {
+
+ private Iterator<CarbonRowBatch> iterator;
+
+ public DummyThread(Iterator<CarbonRowBatch> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override public Void call() throws CarbonDataLoadingException {
+ try {
+ while (iterator.hasNext()) {
+ CarbonRowBatch batch = iterator.next();
+ Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
+ while (batchIterator.hasNext()) {
+ CarbonRow row = batchIterator.next();
+ // do nothing
+ }
+ }
+
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
index 02b7fee..69bd84a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
@@ -11,10 +11,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
import org.apache.carbondata.processing.newflow.parser.RowParser;
import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
@@ -28,42 +25,24 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
private static final LogService LOGGER =
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
- private GenericParser[] genericParsers;
+ private RowParser rowParser;
- private List<Iterator<Object[]>> inputIterators;
+ private Iterator<Object[]>[] inputIterators;
public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
- AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
- super(configuration, child);
+ Iterator<Object[]>[] inputIterators) {
+ super(configuration, null);
this.inputIterators = inputIterators;
}
@Override
public DataField[] getOutput() {
- DataField[] fields = configuration.getDataFields();
- String[] header = configuration.getHeader();
- DataField[] output = new DataField[fields.length];
- int k = 0;
- for (int i = 0; i < header.length; i++) {
- for (int j = 0; j < fields.length; j++) {
- if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
- output[k++] = fields[j];
- break;
- }
- }
- }
- return output;
+ return configuration.getDataFields();
}
@Override
public void initialize() throws CarbonDataLoadingException {
- DataField[] output = getOutput();
- genericParsers = new GenericParser[output.length];
- for (int i = 0; i < genericParsers.length; i++) {
- genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
- (String[]) configuration
- .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
- }
+ rowParser = new RowParserImpl(getOutput(), configuration);
}
@@ -74,7 +53,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
for (int i = 0; i < outIterators.length; i++) {
- outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
+ outIterators[i] = new InputProcessorIterator(readerIterators[i], rowParser, batchSize);
}
return outIterators;
}
@@ -88,15 +67,15 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
// Get the minimum of number of cores and iterators size to get the number of parallel threads
// to be launched.
- int parallelThreadNumber = Math.min(inputIterators.size(), numberOfCores);
+ int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
List<Iterator<Object[]>>[] iterators = new List[parallelThreadNumber];
for (int i = 0; i < parallelThreadNumber; i++) {
iterators[i] = new ArrayList<>();
}
// Equally partition the iterators as per number of threads
- for (int i = 0; i < inputIterators.size(); i++) {
- iterators[i % parallelThreadNumber].add(inputIterators.get(i));
+ for (int i = 0; i < inputIterators.length; i++) {
+ iterators[i % parallelThreadNumber].add(inputIterators[i]);
}
return iterators;
}
@@ -123,10 +102,10 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
private RowParser rowParser;
public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
- GenericParser[] genericParsers, int batchSize) {
+ RowParser rowParser, int batchSize) {
this.inputIterators = inputIterators;
this.batchSize = batchSize;
- this.rowParser = new RowParserImpl(genericParsers);
+ this.rowParser = rowParser;
this.counter = 0;
// Get the first iterator from the list.
currentIterator = inputIterators.get(counter++);
@@ -145,8 +124,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
if (counter < inputIterators.size()) {
// Get the next iterator from the list.
currentIterator = inputIterators.get(counter++);
+ hasNext = internalHasNext();
}
- hasNext = internalHasNext();
}
return hasNext;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
index aae6dae..19d099b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -50,6 +50,7 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
@Override
public void initialize() throws CarbonDataLoadingException {
+ child.initialize();
SortParameters sortParameters = SortParameters.createSortParameters(configuration);
sorter = new ParallelReadMergeSorterImpl(child.getOutput());
sorter.initialize(sortParameters);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/writer/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/writer/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/writer/DataWriterProcessorStepImpl.java
deleted file mode 100644
index 51f5ec4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/writer/DataWriterProcessorStepImpl.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.newflow.steps.writer;
-
-import java.io.File;
-import java.util.Iterator;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.constants.IgnoreDictionary;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It reads data from sorted files which are generated in previous sort step.
- * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
- */
-public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName());
-
- private String storeLocation;
-
-
- private SegmentProperties segmentProperties;
-
- private KeyGenerator keyGenerator;
-
- private CarbonFactHandler dataHandler;
-
- private int noDictionaryCount;
-
- private int complexDimensionCount;
-
- private int measureCount;
-
- private long readCounter;
-
- private long writeCounter;
-
- private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
-
- private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
-
- private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
-
- public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
- AbstractDataLoadProcessorStep child) {
- super(configuration, child);
- }
-
- @Override
- public DataField[] getOutput() {
- return child.getOutput();
- }
-
- @Override
- public void initialize() throws CarbonDataLoadingException {
- CarbonTableIdentifier tableIdentifier =
- configuration.getTableIdentifier().getCarbonTableIdentifier();
-
- String storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
- tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
- configuration.getPartitionId(), configuration.getSegmentId() + "", false);
-
- if (!(new File(storeLocation).exists())) {
- LOGGER.error("Local data load folder location does not exist: " + storeLocation);
- return;
- }
-
- }
-
- @Override
- public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
- Iterator<CarbonRowBatch>[] iterators = child.execute();
- String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
- try {
- CarbonFactDataHandlerModel dataHandlerModel =
- CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration);
- noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
- complexDimensionCount = configuration.getComplexDimensionCount();
- measureCount = dataHandlerModel.getMeasureCount();
- segmentProperties = dataHandlerModel.getSegmentProperties();
- keyGenerator = segmentProperties.getDimensionKeyGenerator();
- dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel,
- CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
- dataHandler.initialise();
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
- System.currentTimeMillis());
- for (int i = 0; i < iterators.length; i++) {
- Iterator<CarbonRowBatch> iterator = iterators[i];
- while (iterator.hasNext()) {
- processBatch(iterator.next());
- }
- }
-
- } catch (CarbonDataWriterException e) {
- LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
- throw new CarbonDataLoadingException(
- "Error while initializing data handler : " + e.getMessage());
- } catch (Exception e) {
- LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
- throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
- } finally {
- try {
- dataHandler.finish();
- } catch (CarbonDataWriterException e) {
- LOGGER.error(e, "Failed for table: " + tableName + " in finishing data handler");
- } catch (Exception e) {
- LOGGER.error(e, "Failed for table: " + tableName + " in finishing data handler");
- }
- }
- LOGGER.info("Record Procerssed For table: " + tableName);
- String logMessage =
- "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
- + writeCounter;
- LOGGER.info(logMessage);
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter);
- processingComplete();
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
- System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
-
- return null;
- }
-
- @Override
- public void close() {
-
- }
-
- private void processingComplete() throws CarbonDataLoadingException {
- if (null != dataHandler) {
- try {
- dataHandler.closeHandler();
- } catch (CarbonDataWriterException e) {
- LOGGER.error(e, e.getMessage());
- throw new CarbonDataLoadingException(e.getMessage());
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
- }
- }
- }
-
- private void processBatch(CarbonRowBatch batch) throws CarbonDataLoadingException {
- Iterator<CarbonRow> iterator = batch.getBatchIterator();
- try {
- while (iterator.hasNext()) {
- CarbonRow row = iterator.next();
- readCounter++;
- Object[] outputRow = null;
- // adding one for the high cardinality dims byte array.
- if (noDictionaryCount > 0 || complexDimensionCount > 0) {
- outputRow = new Object[measureCount + 1 + 1];
- } else {
- outputRow = new Object[measureCount + 1];
- }
-
- int l = 0;
- int index = 0;
- Object[] measures = row.getObjectArray(measureIndex);
- for (int i = 0; i < measureCount; i++) {
- outputRow[l++] = measures[index++];
- }
- outputRow[l] = row.getBinary(noDimByteArrayIndex);
-
- int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
- Integer[] dimsArray = row.getIntegerArray(dimsArrayIndex);
- for (int i = 0; i < highCardExcludedRows.length; i++) {
- highCardExcludedRows[i] = dimsArray[i];
- }
-
- outputRow[outputRow.length - 1] = keyGenerator.generateKey(highCardExcludedRows);
- dataHandler.addDataToStore(outputRow);
- writeCounter++;
- }
- } catch (Exception e) {
- throw new CarbonDataLoadingException("unable to generate the mdkey", e);
- }
- }
-
- @Override
- protected CarbonRow processRow(CarbonRow row) {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index cffc00b..f1b062e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -90,6 +90,10 @@ public class IntermediateFileMerger implements Callable<Void> {
private File outPutFile;
+ private boolean useKettle;
+
+ private boolean[] noDictionarycolumnMapping;
+
/**
* IntermediateFileMerger Constructor
*/
@@ -99,6 +103,8 @@ public class IntermediateFileMerger implements Callable<Void> {
this.fileCounter = intermediateFiles.length;
this.intermediateFiles = intermediateFiles;
this.outPutFile = outPutFile;
+ this.useKettle = mergerParameters.isUseKettle();
+ noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
}
@Override public Void call() throws Exception {
@@ -108,9 +114,14 @@ public class IntermediateFileMerger implements Callable<Void> {
try {
startSorting();
initialize();
-
- while (hasNext()) {
- writeDataTofile(next());
+ if (useKettle) {
+ while (hasNext()) {
+ writeDataTofile(next());
+ }
+ } else {
+ while (hasNext()) {
+ writeDataTofileWithOutKettle(next());
+ }
}
if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
if (entryCount > 0) {
@@ -252,7 +263,8 @@ public class IntermediateFileMerger implements Callable<Void> {
new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
- mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn());
+ mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn(),
+ mergerParameters.isUseKettle());
// initialize
sortTempFileChunkHolder.initialize();
@@ -300,6 +312,8 @@ public class IntermediateFileMerger implements Callable<Void> {
/**
* Below method will be used to write data to file
*
+ * TODO Remove it after kettle is removed
+ *
* @throws CarbonSortKeyAndGroupByException problem while writing
*/
private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException {
@@ -362,6 +376,74 @@ public class IntermediateFileMerger implements Callable<Void> {
}
}
+ /**
+ * Below method will be used to write data to file
+ *
+ * @throws CarbonSortKeyAndGroupByException problem while writing
+ */
+ private void writeDataTofileWithOutKettle(Object[] row) throws CarbonSortKeyAndGroupByException {
+ if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
+ if (entryCount == 0) {
+ records = new Object[totalSize][];
+ records[entryCount++] = row;
+ return;
+ }
+
+ records[entryCount++] = row;
+ if (entryCount == totalSize) {
+ this.writer.writeSortTempFile(records);
+ entryCount = 0;
+ records = new Object[totalSize][];
+ }
+ return;
+ }
+ try {
+ char[] aggType = mergerParameters.getAggType();
+ int[] mdkArray = (int[]) row[0];
+ byte[][] nonDictArray = (byte[][]) row[1];
+ int mdkIndex = 0;
+ int nonDictKeyIndex = 0;
+ // write dictionary and non dictionary dimensions here.
+ for (boolean nodictinary : noDictionarycolumnMapping) {
+ if (nodictinary) {
+ byte[] col = nonDictArray[nonDictKeyIndex++];
+ stream.writeShort(col.length);
+ stream.write(col);
+ } else {
+ stream.writeInt(mdkArray[mdkIndex++]);
+ }
+ }
+
+ int fieldIndex = 0;
+ for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
+ if (null != RemoveDictionaryUtil.getMeasure(fieldIndex, row)) {
+ stream.write((byte) 1);
+ if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) {
+ Double val = (Double) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
+ stream.writeDouble(val);
+ } else if (aggType[counter] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ Double val = (Double) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
+ stream.writeDouble(val);
+ } else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) {
+ Long val = (Long) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
+ stream.writeLong(val);
+ } else if (aggType[counter] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ byte[] bigDecimalInBytes = (byte[]) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
+ stream.writeInt(bigDecimalInBytes.length);
+ stream.write(bigDecimalInBytes);
+ }
+ } else {
+ stream.write((byte) 0);
+ }
+
+ fieldIndex++;
+ }
+
+ } catch (IOException e) {
+ throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+ }
+ }
+
private void finish() throws CarbonSortKeyAndGroupByException {
if (recordHolderHeap != null) {
int size = recordHolderHeap.size();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
new file mode 100644
index 0000000..ed9c018
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.sortandgroupby.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+public class NewRowComparator implements Comparator<Object[]> {
+
+ /**
+ * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
+ */
+ private boolean[] noDictionaryColMaping;
+
+ /**
+ * @param noDictionaryColMaping
+ */
+ public NewRowComparator(boolean[] noDictionaryColMaping) {
+ this.noDictionaryColMaping = noDictionaryColMaping;
+ }
+
+ /**
+ * Below method will be used to compare two mdkey
+ */
+ public int compare(Object[] rowA, Object[] rowB) {
+ int diff = 0;
+
+ int index = 0;
+
+ for (boolean isNoDictionary : noDictionaryColMaping) {
+
+ if (isNoDictionary) {
+ byte[] byteArr1 = (byte[]) rowA[index];
+
+ byte[] byteArr2 = (byte[]) rowB[index];
+
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ int dimFieldA = (int) rowA[index];
+ int dimFieldB = (int) rowB[index];
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+
+ index++;
+ }
+
+ return diff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
new file mode 100644
index 0000000..80aa790
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.sortandgroupby.sortdata;
+
+import java.util.Comparator;
+
+/**
+ * This class is used as comparator for comparing dims which are non high cardinality dims.
+ * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
+ */
+public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
+ /**
+ * dimension count
+ */
+ private int dimensionCount;
+
+ /**
+ * RowComparatorForNormalDims Constructor
+ *
+ * @param dimensionCount
+ */
+ public NewRowComparatorForNormalDims(int dimensionCount) {
+ this.dimensionCount = dimensionCount;
+ }
+
+ /**
+ * Below method will be used to compare two surrogate keys
+ *
+ * @see Comparator#compare(Object, Object)
+ */
+ public int compare(Object[] rowA, Object[] rowB) {
+ int diff = 0;
+
+ for (int i = 0; i < dimensionCount; i++) {
+
+ int dimFieldA = (int)rowA[i];
+ int dimFieldB = (int)rowB[i];
+ diff = dimFieldA - dimFieldB;
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ return diff;
+ }
+}