You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:25 UTC
[38/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
new file mode 100644
index 0000000..5c81bb9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+
+/**
+ * Metadata class for each column of table.
+ */
+public class DataField implements Serializable {
+
+ public DataField(CarbonColumn column) {
+ this.column = column;
+ }
+
+ private CarbonColumn column;
+
+ private String dateFormat;
+
+ public boolean hasDictionaryEncoding() {
+ return column.hasEncoding(Encoding.DICTIONARY);
+ }
+
+ public CarbonColumn getColumn() {
+ return column;
+ }
+
+ public String getDateFormat() {
+ return dateFormat;
+ }
+
+ public void setDateFormat(String dateFormat) {
+ this.dateFormat = dateFormat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
new file mode 100644
index 0000000..10b19b7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.exception.NoRetryException;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+/**
+ * It executes the data load.
+ */
+public class DataLoadExecutor {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
+
+ private AbstractDataLoadProcessorStep loadProcessorStep;
+
+ private boolean isClosed;
+
+ public void execute(CarbonLoadModel loadModel, String[] storeLocation,
+ CarbonIterator<Object[]>[] inputIterators) throws Exception {
+ try {
+ loadProcessorStep =
+ new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators);
+ // 1. initialize
+ loadProcessorStep.initialize();
+ LOGGER.info("Data Loading is started for table " + loadModel.getTableName());
+ // 2. execute the step
+ loadProcessorStep.execute();
+ // check and remove any bad record key from bad record entry logger static map
+ if (badRecordFound(
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())) {
+ LOGGER.error("Data Load is partially success for table " + loadModel.getTableName());
+ } else {
+ LOGGER.info("Data loading is successful for table " + loadModel.getTableName());
+ }
+ } catch (CarbonDataLoadingException e) {
+ if (e instanceof BadRecordFoundException) {
+ throw new NoRetryException(e.getMessage());
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName());
+ throw new CarbonDataLoadingException(
+ "Data Loading failed for table " + loadModel.getTableName(), e);
+ } finally {
+ removeBadRecordKey(
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier());
+ }
+ }
+
+ /**
+ * This method will remove any bad record key from the map entry
+ *
+ * @param carbonTableIdentifier
+ * @return
+ */
+ private boolean badRecordFound(CarbonTableIdentifier carbonTableIdentifier) {
+ String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
+ boolean badRecordKeyFound = false;
+ if (null != BadRecordsLogger.hasBadRecord(badRecordLoggerKey)) {
+ badRecordKeyFound = true;
+ }
+ return badRecordKeyFound;
+ }
+
+ /**
+ * This method will remove the bad record key from bad record logger
+ *
+ * @param carbonTableIdentifier
+ */
+ private void removeBadRecordKey(CarbonTableIdentifier carbonTableIdentifier) {
+ String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
+ BadRecordsLogger.removeBadRecordKey(badRecordLoggerKey);
+ }
+
+ /**
+ * Method to clean all the resource
+ */
+ public void close() {
+ if (!isClosed && loadProcessorStep != null) {
+ loadProcessorStep.close();
+ }
+ isClosed = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
new file mode 100644
index 0000000..05104a2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.loading.steps.CarbonRowDataWriterProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.DataConverterProcessorWithBucketingStepImpl;
+import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * It builds the pipe line of steps for loading data to carbon.
+ */
+public final class DataLoadProcessBuilder {
+
+ public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String[] storeLocation,
+ CarbonIterator[] inputIterators) throws Exception {
+ CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
+ SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
+ if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
+ return buildInternalForNoSort(inputIterators, configuration);
+ } else if (configuration.getBucketingInfo() != null) {
+ return buildInternalForBucketing(inputIterators, configuration);
+ } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
+ return buildInternalForBatchSort(inputIterators, configuration);
+ } else {
+ return buildInternal(inputIterators, configuration);
+ }
+ }
+
+ private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators,
+ CarbonDataLoadConfiguration configuration) {
+ // 1. Reads the data input iterators and parses the data.
+ AbstractDataLoadProcessorStep inputProcessorStep =
+ new InputProcessorStepImpl(configuration, inputIterators);
+ // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+ // data types and configurations.
+ AbstractDataLoadProcessorStep converterProcessorStep =
+ new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+ // 3. Sorts the data by SortColumn
+ AbstractDataLoadProcessorStep sortProcessorStep =
+ new SortProcessorStepImpl(configuration, converterProcessorStep);
+ // 4. Writes the sorted data in carbondata format.
+ return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+ }
+
+ private AbstractDataLoadProcessorStep buildInternalForNoSort(CarbonIterator[] inputIterators,
+ CarbonDataLoadConfiguration configuration) {
+ // 1. Reads the data input iterators and parses the data.
+ AbstractDataLoadProcessorStep inputProcessorStep =
+ new InputProcessorStepImpl(configuration, inputIterators);
+ // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+ // data types and configurations.
+ AbstractDataLoadProcessorStep converterProcessorStep =
+ new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+ // 3. Writes the sorted data in carbondata format.
+ AbstractDataLoadProcessorStep writerProcessorStep =
+ new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep);
+ return writerProcessorStep;
+ }
+
+ private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators,
+ CarbonDataLoadConfiguration configuration) {
+ // 1. Reads the data input iterators and parses the data.
+ AbstractDataLoadProcessorStep inputProcessorStep =
+ new InputProcessorStepImpl(configuration, inputIterators);
+ // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+ // data types and configurations.
+ AbstractDataLoadProcessorStep converterProcessorStep =
+ new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+ // 3. Sorts the data by SortColumn or not
+ AbstractDataLoadProcessorStep sortProcessorStep =
+ new SortProcessorStepImpl(configuration, converterProcessorStep);
+ // 4. Writes the sorted data in carbondata format.
+ return new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
+ }
+
+ private AbstractDataLoadProcessorStep buildInternalForBucketing(CarbonIterator[] inputIterators,
+ CarbonDataLoadConfiguration configuration) throws Exception {
+ // 1. Reads the data input iterators and parses the data.
+ AbstractDataLoadProcessorStep inputProcessorStep =
+ new InputProcessorStepImpl(configuration, inputIterators);
+ // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+ // data types and configurations.
+ AbstractDataLoadProcessorStep converterProcessorStep =
+ new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep);
+ // 3. Sorts the data by SortColumn or not
+ AbstractDataLoadProcessorStep sortProcessorStep =
+ new SortProcessorStepImpl(configuration, converterProcessorStep);
+ // 4. Writes the sorted data in carbondata format.
+ return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+ }
+
+ public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
+ String[] storeLocation) {
+ CarbonDataProcessorUtil.createLocations(storeLocation);
+
+ String databaseName = loadModel.getDatabaseName();
+ String tableName = loadModel.getTableName();
+ String tempLocationKey = CarbonDataProcessorUtil
+ .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
+ loadModel.getTaskNo(), false, false);
+ CarbonProperties.getInstance().addProperty(tempLocationKey,
+ StringUtils.join(storeLocation, File.pathSeparator));
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());
+
+ return createConfiguration(loadModel);
+ }
+
+ public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel) {
+ CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration();
+ CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+ AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+ configuration.setTableIdentifier(identifier);
+ configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
+ configuration.setHeader(loadModel.getCsvHeaderColumns());
+ configuration.setPartitionId(loadModel.getPartitionId());
+ configuration.setSegmentId(loadModel.getSegmentId());
+ configuration.setTaskNo(loadModel.getTaskNo());
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
+ new String[] { loadModel.getComplexDelimiterLevel1(),
+ loadModel.getComplexDelimiterLevel2() });
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
+ loadModel.getSerializationNullFormat().split(",")[1]);
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP,
+ loadModel.getFactTimeStamp());
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE,
+ loadModel.getBadRecordsLoggerEnable().split(",")[1]);
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION,
+ loadModel.getBadRecordsAction().split(",")[1]);
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD,
+ loadModel.getIsEmptyDataBadRecord().split(",")[1]);
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
+ loadModel.getFactFilePath());
+ configuration
+ .setDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, loadModel.getSortScope());
+ configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ loadModel.getBatchSortSizeInMb());
+ configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+ loadModel.getGlobalSortPartitions());
+ configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ loadModel.getBadRecordsLocation());
+ CarbonMetadata.getInstance().addCarbonTable(carbonTable);
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ List<CarbonMeasure> measures =
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ Map<String, String> dateFormatMap =
+ CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
+ List<DataField> dataFields = new ArrayList<>();
+ List<DataField> complexDataFields = new ArrayList<>();
+
+ // First add dictionary and non dictionary dimensions because these are part of mdk key.
+ // And then add complex data types and measures.
+ for (CarbonColumn column : dimensions) {
+ DataField dataField = new DataField(column);
+ dataField.setDateFormat(dateFormatMap.get(column.getColName()));
+ if (column.isComplex()) {
+ complexDataFields.add(dataField);
+ } else {
+ dataFields.add(dataField);
+ }
+ }
+ dataFields.addAll(complexDataFields);
+ for (CarbonColumn column : measures) {
+ // This dummy measure is added when no measure was present. We no need to load it.
+ if (!(column.getColName().equals("default_dummy_measure"))) {
+ dataFields.add(new DataField(column));
+ }
+ }
+ configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
+ configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
+ // configuration for one pass load: dictionary server info
+ configuration.setUseOnePass(loadModel.getUseOnePass());
+ configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
+ configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
+ configuration.setPreFetch(loadModel.isPreFetch());
+ configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
+ configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
+
+ TableSpec tableSpec = new TableSpec(dimensions, measures);
+ configuration.setTableSpec(tableSpec);
+ return configuration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/FailureCauses.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/FailureCauses.java b/processing/src/main/java/org/apache/carbondata/processing/loading/FailureCauses.java
new file mode 100644
index 0000000..6e5f91a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/FailureCauses.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading;
+
+/**
+ * This Enum is used to determine the Reasons of Failure.
+ */
+public enum FailureCauses {
+ NONE,
+ BAD_RECORDS,
+ EXECUTOR_FAILURE,
+ STATUS_FILE_UPDATION_FAILURE,
+ MULTIPLE_INPUT_ROWS_MATCHING
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/ArrayObject.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/ArrayObject.java b/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/ArrayObject.java
new file mode 100644
index 0000000..ecd46cc
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/ArrayObject.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.complexobjects;
+
+public class ArrayObject {
+
+ private Object[] data;
+
+ public ArrayObject(Object[] data) {
+ this.data = data;
+ }
+
+ public Object[] getData() {
+ return data;
+ }
+
+ public void setData(Object[] data) {
+ this.data = data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/StructObject.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/StructObject.java b/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/StructObject.java
new file mode 100644
index 0000000..c026a48
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/StructObject.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.complexobjects;
+
+public class StructObject {
+
+ private Object[] data;
+
+ public StructObject(Object[] data) {
+ this.data = data;
+ }
+
+ public Object[] getData() {
+ return data;
+ }
+
+ public void setData(Object[] data) {
+ this.data = data;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
new file mode 100644
index 0000000..260661b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.constants;
+
+/**
+ * Constants used in data loading.
+ */
+public final class DataLoadProcessorConstants {
+
+ public static final String FACT_TIME_STAMP = "FACT_TIME_STAMP";
+
+ public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
+
+ public static final String SERIALIZATION_NULL_FORMAT = "SERIALIZATION_NULL_FORMAT";
+
+ public static final String BAD_RECORDS_LOGGER_ENABLE = "BAD_RECORDS_LOGGER_ENABLE";
+
+ public static final String BAD_RECORDS_LOGGER_ACTION = "BAD_RECORDS_LOGGER_ACTION";
+
+ public static final String IS_EMPTY_DATA_BAD_RECORD = "IS_EMPTY_DATA_BAD_RECORD";
+
+ public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/BadRecordLogHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/BadRecordLogHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/BadRecordLogHolder.java
new file mode 100644
index 0000000..aeb4d15
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/BadRecordLogHolder.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * It is holder for reason of bad records.
+ */
+public class BadRecordLogHolder {
+
+ /**
+ * this map will hold the bad record unified message for columns
+ */
+ private Map<String, String> columnMessageMap = new HashMap<>();
+
+ private String reason;
+
+ private boolean badRecordAdded;
+
+ private boolean isLogged;
+
+ public String getReason() {
+ return reason;
+ }
+
+ public void setReason(String reason) {
+ this.reason = reason;
+ badRecordAdded = true;
+ }
+
+ public boolean isBadRecordNotAdded() {
+ return badRecordAdded;
+ }
+
+ public void clear() {
+ this.badRecordAdded = false;
+ }
+
+ public boolean isLogged() {
+ return isLogged;
+ }
+
+ public void setLogged(boolean logged) {
+ isLogged = logged;
+ }
+
+ public Map<String, String> getColumnMessageMap() {
+ return columnMessageMap;
+ }
+
+ /**
+ * this method will clear the map entries
+ */
+ public void finish() {
+ if (null != columnMessageMap) {
+ columnMessageMap.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/DictionaryCardinalityFinder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/DictionaryCardinalityFinder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/DictionaryCardinalityFinder.java
new file mode 100644
index 0000000..aa84fc3
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/DictionaryCardinalityFinder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter;
+
+/**
+ * Finds the current cardinality of dimensions.
+ */
+public interface DictionaryCardinalityFinder {
+
+ int[] getCardinality();
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
new file mode 100644
index 0000000..8a3e2eb
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+/**
+ * This interface converts/transforms the column field.
+ */
+public interface FieldConverter {
+
+ /**
+ * It converts the column field and updates the data in same location/index in row.
+ * @param row
+ * @return the status whether it could be loaded or not, usually when record is added
+ * to bad records then it returns false.
+ * @throws CarbonDataLoadingException
+ */
+ void convert(CarbonRow row, BadRecordLogHolder logHolder) throws CarbonDataLoadingException;
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
new file mode 100644
index 0000000..fd3a650
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+/**
+ * convert the row
+ */
+public interface RowConverter extends DictionaryCardinalityFinder {
+
+ void initialize() throws IOException;
+
+ CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException;
+
+ RowConverter createCopyForNewThread();
+
+ void finish();
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/AbstractDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/AbstractDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/AbstractDictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..5349e33
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/AbstractDictionaryFieldConverterImpl.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+
+public abstract class AbstractDictionaryFieldConverterImpl implements FieldConverter {
+
+ public abstract void fillColumnCardinality(List<Integer> cardinality);
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
new file mode 100644
index 0000000..5ac832d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
+
+ private GenericDataType genericDataType;
+
+ private int index;
+
+ public ComplexFieldConverterImpl(GenericDataType genericDataType, int index) {
+ this.genericDataType = genericDataType;
+ this.index = index;
+ }
+
+ @Override
+ public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+ Object object = row.getObject(index);
+ // TODO Its temporary, needs refactor here.
+ ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
+ try {
+ genericDataType.writeByteArray(object, dataOutputStream);
+ dataOutputStream.close();
+ row.update(byteArray.toByteArray(), index);
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(object + "", e);
+ }
+ }
+
+ @Override public void fillColumnCardinality(List<Integer> cardinality) {
+ genericDataType.fillCardinality(cardinality);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..2671393
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
+import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName());
+
+ private BiDictionary<Integer, Object> dictionaryGenerator;
+
+ private int index;
+
+ private CarbonDimension carbonDimension;
+
+ private String nullFormat;
+
+ private Dictionary dictionary;
+
+ private DictionaryMessage dictionaryMessage;
+
+ private boolean isEmptyBadRecord;
+
+ public DictionaryFieldConverterImpl(DataField dataField,
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+ CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
+ DictionaryClient client, boolean useOnePass, String storePath,
+ Map<Object, Integer> localCache, boolean isEmptyBadRecord) throws IOException {
+ this.index = index;
+ this.carbonDimension = (CarbonDimension) dataField.getColumn();
+ this.nullFormat = nullFormat;
+ this.isEmptyBadRecord = isEmptyBadRecord;
+ DictionaryColumnUniqueIdentifier identifier =
+ new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+ dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType(),
+ CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
+
+ // if use one pass, use DictionaryServerClientDictionary
+ if (useOnePass) {
+ if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+ dictionary = cache.get(identifier);
+ }
+ dictionaryMessage = new DictionaryMessage();
+ dictionaryMessage.setColumnName(dataField.getColumn().getColName());
+ // for table initialization
+ dictionaryMessage.setTableUniqueId(carbonTableIdentifier.getTableId());
+ dictionaryMessage.setData("0");
+ // for generate dictionary
+ dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
+ dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
+ dictionaryMessage, localCache);
+ } else {
+ dictionary = cache.get(identifier);
+ dictionaryGenerator = new PreCreatedDictionary(dictionary);
+ }
+ }
+
+ @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+ throws CarbonDataLoadingException {
+ try {
+ String parsedValue = null;
+ String dimensionValue = row.getString(index);
+ if (dimensionValue == null || dimensionValue.equals(nullFormat)) {
+ parsedValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
+ } else {
+ parsedValue = DataTypeUtil.parseValue(dimensionValue, carbonDimension);
+ }
+ if (null == parsedValue) {
+ if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
+ String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
+ if (null == message) {
+ message = CarbonDataProcessorUtil.prepareFailureReason(
+ carbonDimension.getColName(), carbonDimension.getDataType());
+ logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
+ } logHolder.setReason(message);
+ }
+ row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY, index);
+ } else {
+ row.update(dictionaryGenerator.getOrGenerateKey(parsedValue), index);
+ }
+ } catch (DictionaryGenerationException e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ @Override
+ public void fillColumnCardinality(List<Integer> cardinality) {
+ cardinality.add(dictionaryGenerator.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..24c2f00
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
+
+ private DirectDictionaryGenerator directDictionaryGenerator;
+
+ private int index;
+
+ private String nullFormat;
+
+ private CarbonColumn column;
+ private boolean isEmptyBadRecord;
+
+ public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index,
+ boolean isEmptyBadRecord) {
+ this.nullFormat = nullFormat;
+ this.column = dataField.getColumn();
+ if (dataField.getDateFormat() != null && !dataField.getDateFormat().isEmpty()) {
+ this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dataField.getColumn().getDataType(),
+ dataField.getDateFormat());
+
+ } else {
+ this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
+ }
+ this.index = index;
+ this.isEmptyBadRecord = isEmptyBadRecord;
+ }
+
+ @Override
+ public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+ String value = row.getString(index);
+ if (value == null) {
+ logHolder.setReason(
+ CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
+ row.update(1, index);
+ } else if (value.equals(nullFormat)) {
+ row.update(1, index);
+ } else {
+ int key = directDictionaryGenerator.generateDirectSurrogateKey(value);
+ if (key == 1) {
+ if ((value.length() > 0) || (value.length() == 0 && isEmptyBadRecord)) {
+ String message = logHolder.getColumnMessageMap().get(column.getColName());
+ if (null == message) {
+ message = CarbonDataProcessorUtil.prepareFailureReason(
+ column.getColName(), column.getDataType());
+ logHolder.getColumnMessageMap().put(column.getColName(), message);
+ }
+ logHolder.setReason(message);
+ }
+ }
+ row.update(key, index);
+ }
+ }
+
+ @Override
+ public void fillColumnCardinality(List<Integer> cardinality) {
+ cardinality.add(Integer.MAX_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
new file mode 100644
index 0000000..2efbe26
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
+import org.apache.carbondata.processing.datatypes.StructDataType;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+
+public class FieldEncoderFactory {
+
+ private static FieldEncoderFactory instance;
+
+ private FieldEncoderFactory() {
+
+ }
+
+ public static FieldEncoderFactory getInstance() {
+ if (instance == null) {
+ instance = new FieldEncoderFactory();
+ }
+ return instance;
+ }
+
+ /**
+ * Creates the FieldConverter for all dimensions, for measures return null.
+ *
+ * @param dataField column schema
+ * @param cache dicionary cache.
+ * @param carbonTableIdentifier table identifier
+ * @param index index of column in the row.
+ * @param isEmptyBadRecord
+ * @return
+ */
+ public FieldConverter createFieldEncoder(DataField dataField,
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+ CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
+ DictionaryClient client, Boolean useOnePass, String storePath,
+ Map<Object, Integer> localCache, boolean isEmptyBadRecord)
+ throws IOException {
+ // Converters are only needed for dimensions and measures it return null.
+ if (dataField.getColumn().isDimension()) {
+ if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+ !dataField.getColumn().isComplex()) {
+ return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
+ isEmptyBadRecord);
+ } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
+ !dataField.getColumn().isComplex()) {
+ return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
+ index, client, useOnePass, storePath, localCache, isEmptyBadRecord);
+ } else if (dataField.getColumn().isComplex()) {
+ return new ComplexFieldConverterImpl(
+ createComplexType(dataField, cache, carbonTableIdentifier,
+ client, useOnePass, storePath, localCache), index);
+ } else {
+ return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
+ }
+ } else {
+ return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
+ }
+ }
+
+ /**
+ * Create parser for the carbon column.
+ */
+ private static GenericDataType createComplexType(DataField dataField,
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+ CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+ String storePath, Map<Object, Integer> localCache) {
+ return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
+ carbonTableIdentifier, client, useOnePass, storePath, localCache);
+ }
+
+ /**
+ * This method may be called recursively if the carbon column is complex type.
+ *
+ * @return GenericDataType
+ */
+ private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+ CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+ String storePath, Map<Object, Integer> localCache) {
+ switch (carbonColumn.getDataType()) {
+ case ARRAY:
+ List<CarbonDimension> listOfChildDimensions =
+ ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+ // Create array parser with complex delimiter
+ ArrayDataType arrayDataType =
+ new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+ for (CarbonDimension dimension : listOfChildDimensions) {
+ arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
+ carbonTableIdentifier, client, useOnePass, storePath, localCache));
+ }
+ return arrayDataType;
+ case STRUCT:
+ List<CarbonDimension> dimensions =
+ ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+ // Create struct parser with complex delimiter
+ StructDataType structDataType =
+ new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+ for (CarbonDimension dimension : dimensions) {
+ structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
+ carbonTableIdentifier, client, useOnePass, storePath, localCache));
+ }
+ return structDataType;
+ case MAP:
+ throw new UnsupportedOperationException("Complex type Map is not supported yet");
+ default:
+ return new PrimitiveDataType(carbonColumn.getColName(), parentName,
+ carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
+ carbonTableIdentifier, client, useOnePass, storePath, localCache);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
new file mode 100644
index 0000000..06f7589
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * Converter for measure
+ */
+public class MeasureFieldConverterImpl implements FieldConverter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(MeasureFieldConverterImpl.class.getName());
+
+ private int index;
+
+ private DataType dataType;
+
+ private CarbonMeasure measure;
+
+ private String nullformat;
+
+ private boolean isEmptyBadRecord;
+
+ public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
+ boolean isEmptyBadRecord) {
+ this.dataType = dataField.getColumn().getDataType();
+ this.measure = (CarbonMeasure) dataField.getColumn();
+ this.nullformat = nullformat;
+ this.index = index;
+ this.isEmptyBadRecord = isEmptyBadRecord;
+ }
+
+ @Override
+ public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+ throws CarbonDataLoadingException {
+ String value = row.getString(index);
+ Object output;
+ boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(value);
+ if (value == null || isNull) {
+ String message = logHolder.getColumnMessageMap().get(measure.getColName());
+ if (null == message) {
+ message = CarbonDataProcessorUtil
+ .prepareFailureReason(measure.getColName(), measure.getDataType());
+ logHolder.getColumnMessageMap().put(measure.getColName(), message);
+ }
+ row.update(null, index);
+ } else if (value.length() == 0) {
+ if (isEmptyBadRecord) {
+ String message = logHolder.getColumnMessageMap().get(measure.getColName());
+ if (null == message) {
+ message = CarbonDataProcessorUtil
+ .prepareFailureReason(measure.getColName(), measure.getDataType());
+ logHolder.getColumnMessageMap().put(measure.getColName(), message);
+ }
+ logHolder.setReason(message);
+ }
+ row.update(null, index);
+ } else if (value.equals(nullformat)) {
+ row.update(null, index);
+ } else {
+ try {
+ output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
+ row.update(output, index);
+ } catch (NumberFormatException e) {
+ LOGGER.warn(
+ "Cant not convert value to Numeric type value. Value considered as null.");
+ logHolder.setReason(
+ CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
+ output = null;
+ row.update(output, index);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..8170680
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class NonDictionaryFieldConverterImpl implements FieldConverter {
+
+ private DataType dataType;
+
+ private int index;
+
+ private String nullformat;
+
+ private CarbonColumn column;
+
+ private boolean isEmptyBadRecord;
+
+ private DataField dataField;
+
+ public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index,
+ boolean isEmptyBadRecord) {
+ this.dataField = dataField;
+ this.dataType = dataField.getColumn().getDataType();
+ this.column = dataField.getColumn();
+ this.index = index;
+ this.nullformat = nullformat;
+ this.isEmptyBadRecord = isEmptyBadRecord;
+ }
+
+ @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+ String dimensionValue = row.getString(index);
+ if (null == dimensionValue && column.getDataType() != DataType.STRING) {
+ logHolder.setReason(
+ CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
+ updateWithNullValue(row);
+ } else if (dimensionValue == null || dimensionValue.equals(nullformat)) {
+ updateWithNullValue(row);
+ } else {
+ try {
+ row.update(DataTypeUtil
+ .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType,
+ dataField.getDateFormat()), index);
+ } catch (Throwable ex) {
+ if (dimensionValue.length() > 0 || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
+ String message = logHolder.getColumnMessageMap().get(column.getColName());
+ if (null == message) {
+ message = CarbonDataProcessorUtil
+ .prepareFailureReason(column.getColName(), column.getDataType());
+ logHolder.getColumnMessageMap().put(column.getColName(), message);
+ }
+ logHolder.setReason(message);
+ updateWithNullValue(row);
+ } else {
+ updateWithNullValue(row);
+ }
+ }
+ }
+ }
+
+ private void updateWithNullValue(CarbonRow row) {
+ if (dataType == DataType.STRING) {
+ row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
+ } else {
+ row.update(CarbonCommonConstants.EMPTY_BYTE_ARRAY, index);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
new file mode 100644
index 0000000..a4351ae
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+/**
+ * It converts the complete row if necessary, dictionary columns are encoded with dictionary values
+ * and nondictionary values are converted to binary.
+ */
+public class RowConverterImpl implements RowConverter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RowConverterImpl.class.getName());
+
+ private CarbonDataLoadConfiguration configuration;
+
+ private DataField[] fields;
+
+ private FieldConverter[] fieldConverters;
+
+ private BadRecordsLogger badRecordLogger;
+
+ private BadRecordLogHolder logHolder;
+
+ private List<DictionaryClient> dictClients = new ArrayList<>();
+
+ private ExecutorService executorService;
+
+ private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+
+ private Map<Object, Integer>[] localCaches;
+
+ public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
+ BadRecordsLogger badRecordLogger) {
+ this.fields = fields;
+ this.configuration = configuration;
+ this.badRecordLogger = badRecordLogger;
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ CacheProvider cacheProvider = CacheProvider.getInstance();
+ cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
+ configuration.getTableIdentifier().getStorePath());
+ String nullFormat =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+ .toString();
+ boolean isEmptyBadRecord = Boolean.parseBoolean(
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+ .toString());
+ List<FieldConverter> fieldConverterList = new ArrayList<>();
+ localCaches = new Map[fields.length];
+ long lruCacheStartTime = System.currentTimeMillis();
+ DictionaryClient client = createDictionaryClient();
+ dictClients.add(client);
+
+ for (int i = 0; i < fields.length; i++) {
+ localCaches[i] = new ConcurrentHashMap<>();
+ FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
+ .createFieldEncoder(fields[i], cache,
+ configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
+ configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
+ localCaches[i], isEmptyBadRecord);
+ fieldConverterList.add(fieldConverter);
+ }
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
+ fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
+ logHolder = new BadRecordLogHolder();
+ }
+
+ private DictionaryClient createDictionaryClient() {
+ // for one pass load, start the dictionary client
+ if (configuration.getUseOnePass()) {
+ if (executorService == null) {
+ executorService = Executors.newCachedThreadPool();
+ }
+ Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>() {
+ @Override
+ public DictionaryClient call() throws Exception {
+ Thread.currentThread().setName("Dictionary client");
+ DictionaryClient dictionaryClient = new DictionaryClient();
+ dictionaryClient.startClient(configuration.getDictionaryServerHost(),
+ configuration.getDictionaryServerPort());
+ return dictionaryClient;
+ }
+ });
+
+ try {
+ // wait for client initialization finished, or will raise null pointer exception
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOGGER.error(e);
+ throw new RuntimeException(e);
+ }
+
+ try {
+ return result.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
+ //TODO: only copy if it is bad record
+ CarbonRow copy = row.getCopy();
+ logHolder.setLogged(false);
+ logHolder.clear();
+ for (int i = 0; i < fieldConverters.length; i++) {
+ fieldConverters[i].convert(row, logHolder);
+ if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
+ badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
+ if (badRecordLogger.isDataLoadFail()) {
+ String error = "Data load failed due to bad record: " + logHolder.getReason() +
+ "Please enable bad record logger to know the detail reason.";
+ throw new BadRecordFoundException(error);
+ }
+ logHolder.clear();
+ logHolder.setLogged(true);
+ if (badRecordLogger.isBadRecordConvertNullDisable()) {
+ return null;
+ }
+ }
+ }
+ return row;
+ }
+
+ @Override
+ public void finish() {
+ // close dictionary client when finish write
+ if (configuration.getUseOnePass()) {
+ for (DictionaryClient client : dictClients) {
+ if (client != null) {
+ client.shutDown();
+ }
+ }
+ if (null != logHolder) {
+ logHolder.finish();
+ }
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService = null;
+ }
+ }
+ }
+
+ @Override
+ public RowConverter createCopyForNewThread() {
+ RowConverterImpl converter =
+ new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger);
+ List<FieldConverter> fieldConverterList = new ArrayList<>();
+ DictionaryClient client = createDictionaryClient();
+ dictClients.add(client);
+ String nullFormat =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+ .toString();
+ boolean isEmptyBadRecord = Boolean.parseBoolean(
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+ .toString());
+ for (int i = 0; i < fields.length; i++) {
+ FieldConverter fieldConverter = null;
+ try {
+ fieldConverter = FieldEncoderFactory.getInstance().createFieldEncoder(fields[i], cache,
+ configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
+ configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
+ localCaches[i], isEmptyBadRecord);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ fieldConverterList.add(fieldConverter);
+ }
+ converter.fieldConverters =
+ fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
+ converter.logHolder = new BadRecordLogHolder();
+ return converter;
+ }
+
+ @Override public int[] getCardinality() {
+ List<Integer> dimCardinality = new ArrayList<>();
+ if (fieldConverters != null) {
+ for (int i = 0; i < fieldConverters.length; i++) {
+ if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
+ ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
+ .fillColumnCardinality(dimCardinality);
+ }
+ }
+ }
+ int[] cardinality = new int[dimCardinality.size()];
+ for (int i = 0; i < dimCardinality.size(); i++) {
+ cardinality[i] = dimCardinality.get(i);
+ }
+ return cardinality;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BlockDetails.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BlockDetails.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BlockDetails.java
new file mode 100644
index 0000000..d0c8a73
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BlockDetails.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.csvinput;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * blocks info
+ */
+public class BlockDetails extends FileSplit implements Serializable {
+
+ /**
+ * serialization version
+ */
+ private static final long serialVersionUID = 2293906691860002339L;
+ //block offset
+ private long blockOffset;
+ //block length
+ private long blockLength;
+ //file path which block belong to
+ private String filePath;
+ // locations where this block exists
+ private String[] locations;
+
+ public BlockDetails(Path filePath, long blockOffset, long blockLength, String[] locations) {
+ super(filePath, blockOffset, blockLength, locations);
+ this.filePath = filePath.toString();
+ this.blockOffset = blockOffset;
+ this.blockLength = blockLength;
+ this.locations = locations;
+ }
+
+ public long getBlockOffset() {
+ return blockOffset;
+ }
+
+ public long getBlockLength() {
+ return blockLength;
+ }
+
+ public String getFilePath() {
+ return FileFactory.getUpdatedFilePath(filePath);
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public String[] getLocations() {
+ return locations;
+ }
+
+ /** The file containing this split's data. */
+ @Override
+ public Path getPath() { return new Path(filePath); }
+
+ /** The position of the first byte in the file to process. */
+ @Override
+ public long getStart() { return blockOffset; }
+
+ /** The number of bytes in the file to process. */
+ @Override
+ public long getLength() { return blockLength; }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BoundedInputStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BoundedInputStream.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BoundedInputStream.java
new file mode 100644
index 0000000..6fe9107
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BoundedInputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.csvinput;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Customarized reader class to read data from file
+ * untill the upper threshold reached.
+ */
+public class BoundedInputStream extends InputStream {
+
+ /**
+ * byte value of the new line character
+ */
+ private static final byte END_OF_LINE_BYTE_VALUE = '\n';
+
+ /**
+ * number of extra character to read
+ */
+ private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
+
+ /**
+ * number of bytes remaining
+ */
+ private long remaining;
+ /**
+ * to check whether end of line is found
+ */
+ private boolean endOfLineFound = false;
+
+ private DataInputStream in;
+
+ public BoundedInputStream(DataInputStream in, long limit) {
+ this.in = in;
+ this.remaining = limit;
+ }
+
+ /**
+ * Below method will be used to read the data from file
+ *
+ * @throws IOException
+ * problem while reading
+ */
+ @Override
+ public int read() throws IOException {
+ if (this.remaining == 0) {
+ return -1;
+ } else {
+ int var1 = this.in.read();
+ if (var1 >= 0) {
+ --this.remaining;
+ }
+
+ return var1;
+ }
+ }
+
+ /**
+ * Below method will be used to read the data from file. If limit reaches in
+ * that case it will read until new line character is reached
+ *
+ * @param buffer
+ * buffer in which data will be read
+ * @param offset
+ * from position to buffer will be filled
+ * @param length
+ * number of character to be read
+ * @throws IOException
+ * problem while reading
+ */
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ if (this.remaining == 0) {
+ return -1;
+ } else {
+ if (this.remaining < length) {
+ length = (int) this.remaining;
+ }
+
+ length = this.in.read(buffer, offset, length);
+ if (length >= 0) {
+ this.remaining -= length;
+ if (this.remaining == 0 && !endOfLineFound) {
+ endOfLineFound = true;
+ this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+ } else if (endOfLineFound) {
+ int end = offset + length;
+ for (int i = offset; i < end; i++) {
+ if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
+ this.remaining = 0;
+ return (i - offset) + 1;
+ }
+ }
+ this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+ }
+ }
+ return length;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+
+ public long getRemaining() {
+ return this.remaining;
+ }
+
+}