You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:19 UTC
[32/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
deleted file mode 100644
index 50bfaff..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.processing.newflow.converter.DictionaryCardinalityFinder;
-
-public class CarbonDataLoadConfiguration {
-
- private DataField[] dataFields;
-
- private AbsoluteTableIdentifier tableIdentifier;
-
- private String[] header;
-
- private String partitionId;
-
- private String segmentId;
-
- private String taskNo;
-
- private BucketingInfo bucketingInfo;
-
- private Map<String, Object> dataLoadProperties = new HashMap<>();
-
- /**
- * Use one pass to generate dictionary
- */
- private boolean useOnePass;
-
- /**
- * dictionary server host
- */
- private String dictionaryServerHost;
-
- /**
- * dictionary sever port
- */
- private int dictionaryServerPort;
-
- private boolean preFetch;
-
- private int dimensionCount;
-
- private int measureCount;
-
- private int noDictionaryCount;
-
- private int complexColumnCount;
-
- /**
- * schema updated time stamp to be used for restructure scenarios
- */
- private long schemaUpdatedTimeStamp;
-
- private DictionaryCardinalityFinder cardinalityFinder;
-
- private int numberOfSortColumns;
-
- private int numberOfNoDictSortColumns;
-
- // contains metadata used in write step of loading process
- private TableSpec tableSpec;
-
- public CarbonDataLoadConfiguration() {
- }
-
- public void setDataFields(DataField[] dataFields) {
- this.dataFields = dataFields;
-
- // set counts for each column category
- for (DataField dataField : dataFields) {
- CarbonColumn column = dataField.getColumn();
- if (column.isDimension()) {
- dimensionCount++;
- if (!dataField.hasDictionaryEncoding()) {
- noDictionaryCount++;
- }
- }
- if (column.isComplex()) {
- complexColumnCount++;
- }
- if (column.isMeasure()) {
- measureCount++;
- }
- }
- }
-
- public DataField[] getDataFields() {
- return dataFields;
- }
-
- public int getDimensionCount() {
- return dimensionCount;
- }
-
- public int getNoDictionaryCount() {
- return noDictionaryCount;
- }
-
- public int getComplexColumnCount() {
- return complexColumnCount;
- }
-
- public int getMeasureCount() {
- return measureCount;
- }
-
- public void setNumberOfSortColumns(int numberOfSortColumns) {
- this.numberOfSortColumns = numberOfSortColumns;
- }
-
- public int getNumberOfSortColumns() {
- return this.numberOfSortColumns;
- }
-
- public boolean isSortTable() {
- return this.numberOfSortColumns > 0;
- }
-
- public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
- this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
- }
-
- public int getNumberOfNoDictSortColumns() {
- return this.numberOfNoDictSortColumns;
- }
-
- public String[] getHeader() {
- return header;
- }
-
- public void setHeader(String[] header) {
- this.header = header;
- }
-
- public AbsoluteTableIdentifier getTableIdentifier() {
- return tableIdentifier;
- }
-
- public void setTableIdentifier(AbsoluteTableIdentifier tableIdentifier) {
- this.tableIdentifier = tableIdentifier;
- }
-
- public String getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(String partitionId) {
- this.partitionId = partitionId;
- }
-
- public String getSegmentId() {
- return segmentId;
- }
-
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
- }
-
- public String getTaskNo() {
- return taskNo;
- }
-
- public void setTaskNo(String taskNo) {
- this.taskNo = taskNo;
- }
-
- public void setDataLoadProperty(String key, Object value) {
- dataLoadProperties.put(key, value);
- }
-
- public Object getDataLoadProperty(String key) {
- return dataLoadProperties.get(key);
- }
-
- public BucketingInfo getBucketingInfo() {
- return bucketingInfo;
- }
-
- public void setBucketingInfo(BucketingInfo bucketingInfo) {
- this.bucketingInfo = bucketingInfo;
- }
-
- public boolean getUseOnePass() {
- return useOnePass;
- }
-
- public void setUseOnePass(boolean useOnePass) {
- this.useOnePass = useOnePass;
- }
-
- public String getDictionaryServerHost() {
- return dictionaryServerHost;
- }
-
- public void setDictionaryServerHost(String dictionaryServerHost) {
- this.dictionaryServerHost = dictionaryServerHost;
- }
-
- public int getDictionaryServerPort() {
- return dictionaryServerPort;
- }
-
- public void setDictionaryServerPort(int dictionaryServerPort) {
- this.dictionaryServerPort = dictionaryServerPort;
- }
-
- public boolean isPreFetch() {
- return preFetch;
- }
-
- public void setPreFetch(boolean preFetch) {
- this.preFetch = preFetch;
- }
-
- public long getSchemaUpdatedTimeStamp() {
- return schemaUpdatedTimeStamp;
- }
-
- public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
- this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
- }
-
- public DictionaryCardinalityFinder getCardinalityFinder() {
- return cardinalityFinder;
- }
-
- public void setCardinalityFinder(DictionaryCardinalityFinder cardinalityFinder) {
- this.cardinalityFinder = cardinalityFinder;
- }
-
- public DataType[] getMeasureDataType() {
- List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
- int measureCount = 0;
- for (int i = 0; i < dataFields.length; i++) {
- if (!dataFields[i].getColumn().isDimension()) {
- measureIndexes.add(i);
- measureCount++;
- }
- }
-
- DataType[] type = new DataType[measureCount];
- for (int i = 0; i < type.length; i++) {
- type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
- }
- return type;
- }
-
- public int[] calcDimensionLengths() {
- int[] dimLensWithComplex = getCardinalityFinder().getCardinality();
- if (!isSortTable()) {
- for (int i = 0; i < dimLensWithComplex.length; i++) {
- if (dimLensWithComplex[i] != 0) {
- dimLensWithComplex[i] = Integer.MAX_VALUE;
- }
- }
- }
- List<Integer> dimsLenList = new ArrayList<Integer>();
- for (int eachDimLen : dimLensWithComplex) {
- if (eachDimLen != 0) dimsLenList.add(eachDimLen);
- }
- int[] dimLens = new int[dimsLenList.size()];
- for (int i = 0; i < dimsLenList.size(); i++) {
- dimLens[i] = dimsLenList.get(i);
- }
- return dimLens;
- }
-
- public KeyGenerator[] createKeyGeneratorForComplexDimension() {
- int[] dimLens = calcDimensionLengths();
- KeyGenerator[] complexKeyGenerators = new KeyGenerator[dimLens.length];
- for (int i = 0; i < dimLens.length; i++) {
- complexKeyGenerators[i] =
- KeyGeneratorFactory.getKeyGenerator(new int[] { dimLens[i] });
- }
- return complexKeyGenerators;
- }
-
- public TableSpec getTableSpec() {
- return tableSpec;
- }
-
- public void setTableSpec(TableSpec tableSpec) {
- this.tableSpec = tableSpec;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
deleted file mode 100644
index 892055b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-
-/**
- * Metadata class for each column of table.
- */
-public class DataField implements Serializable {
-
- public DataField(CarbonColumn column) {
- this.column = column;
- }
-
- private CarbonColumn column;
-
- private String dateFormat;
-
- public boolean hasDictionaryEncoding() {
- return column.hasEncoding(Encoding.DICTIONARY);
- }
-
- public CarbonColumn getColumn() {
- return column;
- }
-
- public String getDateFormat() {
- return dateFormat;
- }
-
- public void setDateFormat(String dateFormat) {
- this.dateFormat = dateFormat;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
deleted file mode 100644
index 36a89b5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.exception.NoRetryException;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
-
-/**
- * It executes the data load.
- */
-public class DataLoadExecutor {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
-
- private AbstractDataLoadProcessorStep loadProcessorStep;
-
- private boolean isClosed;
-
- public void execute(CarbonLoadModel loadModel, String[] storeLocation,
- CarbonIterator<Object[]>[] inputIterators) throws Exception {
- try {
- loadProcessorStep =
- new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators);
- // 1. initialize
- loadProcessorStep.initialize();
- LOGGER.info("Data Loading is started for table " + loadModel.getTableName());
- // 2. execute the step
- loadProcessorStep.execute();
- // check and remove any bad record key from bad record entry logger static map
- if (badRecordFound(
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())) {
- LOGGER.error("Data Load is partially success for table " + loadModel.getTableName());
- } else {
- LOGGER.info("Data loading is successful for table " + loadModel.getTableName());
- }
- } catch (CarbonDataLoadingException e) {
- if (e instanceof BadRecordFoundException) {
- throw new NoRetryException(e.getMessage());
- } else {
- throw e;
- }
- } catch (Exception e) {
- LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName());
- throw new CarbonDataLoadingException(
- "Data Loading failed for table " + loadModel.getTableName(), e);
- } finally {
- removeBadRecordKey(
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier());
- }
- }
-
- /**
- * This method will remove any bad record key from the map entry
- *
- * @param carbonTableIdentifier
- * @return
- */
- private boolean badRecordFound(CarbonTableIdentifier carbonTableIdentifier) {
- String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
- boolean badRecordKeyFound = false;
- if (null != BadRecordsLogger.hasBadRecord(badRecordLoggerKey)) {
- badRecordKeyFound = true;
- }
- return badRecordKeyFound;
- }
-
- /**
- * This method will remove the bad record key from bad record logger
- *
- * @param carbonTableIdentifier
- */
- private void removeBadRecordKey(CarbonTableIdentifier carbonTableIdentifier) {
- String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
- BadRecordsLogger.removeBadRecordKey(badRecordLoggerKey);
- }
-
- /**
- * Method to clean all the resource
- */
- public void close() {
- if (!isClosed && loadProcessorStep != null) {
- loadProcessorStep.close();
- }
- isClosed = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
deleted file mode 100644
index ccb25e6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
-import org.apache.carbondata.processing.newflow.steps.CarbonRowDataWriterProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
-import org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * It builds the pipe line of steps for loading data to carbon.
- */
-public final class DataLoadProcessBuilder {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName());
-
- public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String[] storeLocation,
- CarbonIterator[] inputIterators) throws Exception {
- CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
- SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
- if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
- return buildInternalForNoSort(inputIterators, configuration);
- } else if (configuration.getBucketingInfo() != null) {
- return buildInternalForBucketing(inputIterators, configuration);
- } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
- return buildInternalForBatchSort(inputIterators, configuration);
- } else {
- return buildInternal(inputIterators, configuration);
- }
- }
-
- private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators,
- CarbonDataLoadConfiguration configuration) {
- // 1. Reads the data input iterators and parses the data.
- AbstractDataLoadProcessorStep inputProcessorStep =
- new InputProcessorStepImpl(configuration, inputIterators);
- // 2. Converts the data like dictionary or non dictionary or complex objects depends on
- // data types and configurations.
- AbstractDataLoadProcessorStep converterProcessorStep =
- new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
- // 3. Sorts the data by SortColumn
- AbstractDataLoadProcessorStep sortProcessorStep =
- new SortProcessorStepImpl(configuration, converterProcessorStep);
- // 4. Writes the sorted data in carbondata format.
- return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
- }
-
- private AbstractDataLoadProcessorStep buildInternalForNoSort(CarbonIterator[] inputIterators,
- CarbonDataLoadConfiguration configuration) {
- // 1. Reads the data input iterators and parses the data.
- AbstractDataLoadProcessorStep inputProcessorStep =
- new InputProcessorStepImpl(configuration, inputIterators);
- // 2. Converts the data like dictionary or non dictionary or complex objects depends on
- // data types and configurations.
- AbstractDataLoadProcessorStep converterProcessorStep =
- new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
- // 3. Writes the sorted data in carbondata format.
- AbstractDataLoadProcessorStep writerProcessorStep =
- new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep);
- return writerProcessorStep;
- }
-
- private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators,
- CarbonDataLoadConfiguration configuration) {
- // 1. Reads the data input iterators and parses the data.
- AbstractDataLoadProcessorStep inputProcessorStep =
- new InputProcessorStepImpl(configuration, inputIterators);
- // 2. Converts the data like dictionary or non dictionary or complex objects depends on
- // data types and configurations.
- AbstractDataLoadProcessorStep converterProcessorStep =
- new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
- // 3. Sorts the data by SortColumn or not
- AbstractDataLoadProcessorStep sortProcessorStep =
- new SortProcessorStepImpl(configuration, converterProcessorStep);
- // 4. Writes the sorted data in carbondata format.
- return new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
- }
-
- private AbstractDataLoadProcessorStep buildInternalForBucketing(CarbonIterator[] inputIterators,
- CarbonDataLoadConfiguration configuration) throws Exception {
- // 1. Reads the data input iterators and parses the data.
- AbstractDataLoadProcessorStep inputProcessorStep =
- new InputProcessorStepImpl(configuration, inputIterators);
- // 2. Converts the data like dictionary or non dictionary or complex objects depends on
- // data types and configurations.
- AbstractDataLoadProcessorStep converterProcessorStep =
- new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep);
- // 3. Sorts the data by SortColumn or not
- AbstractDataLoadProcessorStep sortProcessorStep =
- new SortProcessorStepImpl(configuration, converterProcessorStep);
- // 4. Writes the sorted data in carbondata format.
- return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
- }
-
- public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
- String[] storeLocation) {
- CarbonDataProcessorUtil.createLocations(storeLocation);
-
- String databaseName = loadModel.getDatabaseName();
- String tableName = loadModel.getTableName();
- String tempLocationKey = CarbonDataProcessorUtil
- .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
- loadModel.getTaskNo(), false, false);
- CarbonProperties.getInstance().addProperty(tempLocationKey,
- StringUtils.join(storeLocation, File.pathSeparator));
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());
-
- return createConfiguration(loadModel);
- }
-
- public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel) {
- CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration();
- CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
- AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
- configuration.setTableIdentifier(identifier);
- configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
- configuration.setHeader(loadModel.getCsvHeaderColumns());
- configuration.setPartitionId(loadModel.getPartitionId());
- configuration.setSegmentId(loadModel.getSegmentId());
- configuration.setTaskNo(loadModel.getTaskNo());
- configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
- new String[] { loadModel.getComplexDelimiterLevel1(),
- loadModel.getComplexDelimiterLevel2() });
- configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
- loadModel.getSerializationNullFormat().split(",")[1]);
- configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP,
- loadModel.getFactTimeStamp());
- configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE,
- loadModel.getBadRecordsLoggerEnable().split(",")[1]);
- configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION,
- loadModel.getBadRecordsAction().split(",")[1]);
- configuration.setDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD,
- loadModel.getIsEmptyDataBadRecord().split(",")[1]);
- configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
- loadModel.getFactFilePath());
- configuration
- .setDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, loadModel.getSortScope());
- configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- loadModel.getBatchSortSizeInMb());
- configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- loadModel.getGlobalSortPartitions());
- configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- loadModel.getBadRecordsLocation());
- CarbonMetadata.getInstance().addCarbonTable(carbonTable);
- List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
- List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
- Map<String, String> dateFormatMap =
- CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
- List<DataField> dataFields = new ArrayList<>();
- List<DataField> complexDataFields = new ArrayList<>();
-
- // First add dictionary and non dictionary dimensions because these are part of mdk key.
- // And then add complex data types and measures.
- for (CarbonColumn column : dimensions) {
- DataField dataField = new DataField(column);
- dataField.setDateFormat(dateFormatMap.get(column.getColName()));
- if (column.isComplex()) {
- complexDataFields.add(dataField);
- } else {
- dataFields.add(dataField);
- }
- }
- dataFields.addAll(complexDataFields);
- for (CarbonColumn column : measures) {
- // This dummy measure is added when no measure was present. We no need to load it.
- if (!(column.getColName().equals("default_dummy_measure"))) {
- dataFields.add(new DataField(column));
- }
- }
- configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
- configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
- // configuration for one pass load: dictionary server info
- configuration.setUseOnePass(loadModel.getUseOnePass());
- configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
- configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
- configuration.setPreFetch(loadModel.isPreFetch());
- configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
- configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
-
- TableSpec tableSpec = new TableSpec(dimensions, measures);
- configuration.setTableSpec(tableSpec);
- return configuration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java
deleted file mode 100644
index 196afdb..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.complexobjects;
-
-public class ArrayObject {
-
- private Object[] data;
-
- public ArrayObject(Object[] data) {
- this.data = data;
- }
-
- public Object[] getData() {
- return data;
- }
-
- public void setData(Object[] data) {
- this.data = data;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java
deleted file mode 100644
index d1e0f9b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.complexobjects;
-
-public class StructObject {
-
- private Object[] data;
-
- public StructObject(Object[] data) {
- this.data = data;
- }
-
- public Object[] getData() {
- return data;
- }
-
- public void setData(Object[] data) {
- this.data = data;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
deleted file mode 100644
index 11570b4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.constants;
-
-/**
- * Constants used in data loading.
- */
-public final class DataLoadProcessorConstants {
-
- public static final String FACT_TIME_STAMP = "FACT_TIME_STAMP";
-
- public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
-
- public static final String SERIALIZATION_NULL_FORMAT = "SERIALIZATION_NULL_FORMAT";
-
- public static final String BAD_RECORDS_LOGGER_ENABLE = "BAD_RECORDS_LOGGER_ENABLE";
-
- public static final String BAD_RECORDS_LOGGER_ACTION = "BAD_RECORDS_LOGGER_ACTION";
-
- public static final String IS_EMPTY_DATA_BAD_RECORD = "IS_EMPTY_DATA_BAD_RECORD";
-
- public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
deleted file mode 100644
index f7ce620..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * It is holder for reason of bad records.
- */
-public class BadRecordLogHolder {
-
- /**
- * this map will hold the bad record unified message for columns
- */
- private Map<String, String> columnMessageMap = new HashMap<>();
-
- private String reason;
-
- private boolean badRecordAdded;
-
- private boolean isLogged;
-
- public String getReason() {
- return reason;
- }
-
- public void setReason(String reason) {
- this.reason = reason;
- badRecordAdded = true;
- }
-
- public boolean isBadRecordNotAdded() {
- return badRecordAdded;
- }
-
- public void clear() {
- this.badRecordAdded = false;
- }
-
- public boolean isLogged() {
- return isLogged;
- }
-
- public void setLogged(boolean logged) {
- isLogged = logged;
- }
-
- public Map<String, String> getColumnMessageMap() {
- return columnMessageMap;
- }
-
- /**
- * this method will clear the map entries
- */
- public void finish() {
- if (null != columnMessageMap) {
- columnMessageMap.clear();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java
deleted file mode 100644
index 751f909..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter;
-
-/**
- * Finds the current cardinality of dimensions.
- */
-public interface DictionaryCardinalityFinder {
-
- int[] getCardinality();
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
deleted file mode 100644
index 88828be..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-/**
- * This interface converts/transforms the column field.
- */
-public interface FieldConverter {
-
- /**
- * It converts the column field and updates the data in same location/index in row.
- * @param row
- * @return the status whether it could be loaded or not, usually when record is added
- * to bad records then it returns false.
- * @throws CarbonDataLoadingException
- */
- void convert(CarbonRow row, BadRecordLogHolder logHolder) throws CarbonDataLoadingException;
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
deleted file mode 100644
index f4876db..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-/**
- * convert the row
- */
-public interface RowConverter extends DictionaryCardinalityFinder {
-
- void initialize() throws IOException;
-
- CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException;
-
- RowConverter createCopyForNewThread();
-
- void finish();
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
deleted file mode 100644
index bbbf5e6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-
-public abstract class AbstractDictionaryFieldConverterImpl implements FieldConverter {
-
- public abstract void fillColumnCardinality(List<Integer> cardinality);
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
deleted file mode 100644
index 8feea6a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
-
- private GenericDataType genericDataType;
-
- private int index;
-
- public ComplexFieldConverterImpl(GenericDataType genericDataType, int index) {
- this.genericDataType = genericDataType;
- this.index = index;
- }
-
- @Override
- public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
- Object object = row.getObject(index);
- // TODO Its temporary, needs refactor here.
- ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
- try {
- genericDataType.writeByteArray(object, dataOutputStream);
- dataOutputStream.close();
- row.update(byteArray.toByteArray(), index);
- } catch (Exception e) {
- throw new CarbonDataLoadingException(object + "", e);
- }
- }
-
- @Override public void fillColumnCardinality(List<Integer> cardinality) {
- genericDataType.fillCardinality(cardinality);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
deleted file mode 100644
index 8d4d5a3..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary;
-import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName());
-
- private BiDictionary<Integer, Object> dictionaryGenerator;
-
- private int index;
-
- private CarbonDimension carbonDimension;
-
- private String nullFormat;
-
- private Dictionary dictionary;
-
- private DictionaryMessage dictionaryMessage;
-
- private boolean isEmptyBadRecord;
-
- public DictionaryFieldConverterImpl(DataField dataField,
- Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
- DictionaryClient client, boolean useOnePass, String storePath,
- Map<Object, Integer> localCache, boolean isEmptyBadRecord) throws IOException {
- this.index = index;
- this.carbonDimension = (CarbonDimension) dataField.getColumn();
- this.nullFormat = nullFormat;
- this.isEmptyBadRecord = isEmptyBadRecord;
- DictionaryColumnUniqueIdentifier identifier =
- new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
- dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType(),
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
-
- // if use one pass, use DictionaryServerClientDictionary
- if (useOnePass) {
- if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
- dictionary = cache.get(identifier);
- }
- dictionaryMessage = new DictionaryMessage();
- dictionaryMessage.setColumnName(dataField.getColumn().getColName());
- // for table initialization
- dictionaryMessage.setTableUniqueId(carbonTableIdentifier.getTableId());
- dictionaryMessage.setData("0");
- // for generate dictionary
- dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
- dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
- dictionaryMessage, localCache);
- } else {
- dictionary = cache.get(identifier);
- dictionaryGenerator = new PreCreatedDictionary(dictionary);
- }
- }
-
- @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder)
- throws CarbonDataLoadingException {
- try {
- String parsedValue = null;
- String dimensionValue = row.getString(index);
- if (dimensionValue == null || dimensionValue.equals(nullFormat)) {
- parsedValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
- } else {
- parsedValue = DataTypeUtil.parseValue(dimensionValue, carbonDimension);
- }
- if (null == parsedValue) {
- if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
- String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
- if (null == message) {
- message = CarbonDataProcessorUtil.prepareFailureReason(
- carbonDimension.getColName(), carbonDimension.getDataType());
- logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
- } logHolder.setReason(message);
- }
- row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY, index);
- } else {
- row.update(dictionaryGenerator.getOrGenerateKey(parsedValue), index);
- }
- } catch (DictionaryGenerationException e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- @Override
- public void fillColumnCardinality(List<Integer> cardinality) {
- cardinality.add(dictionaryGenerator.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
deleted file mode 100644
index f269274..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
-
- private DirectDictionaryGenerator directDictionaryGenerator;
-
- private int index;
-
- private String nullFormat;
-
- private CarbonColumn column;
- private boolean isEmptyBadRecord;
-
- public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index,
- boolean isEmptyBadRecord) {
- this.nullFormat = nullFormat;
- this.column = dataField.getColumn();
- if (dataField.getDateFormat() != null && !dataField.getDateFormat().isEmpty()) {
- this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(dataField.getColumn().getDataType(),
- dataField.getDateFormat());
-
- } else {
- this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
- }
- this.index = index;
- this.isEmptyBadRecord = isEmptyBadRecord;
- }
-
- @Override
- public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
- String value = row.getString(index);
- if (value == null) {
- logHolder.setReason(
- CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
- row.update(1, index);
- } else if (value.equals(nullFormat)) {
- row.update(1, index);
- } else {
- int key = directDictionaryGenerator.generateDirectSurrogateKey(value);
- if (key == 1) {
- if ((value.length() > 0) || (value.length() == 0 && isEmptyBadRecord)) {
- String message = logHolder.getColumnMessageMap().get(column.getColName());
- if (null == message) {
- message = CarbonDataProcessorUtil.prepareFailureReason(
- column.getColName(), column.getDataType());
- logHolder.getColumnMessageMap().put(column.getColName(), message);
- }
- logHolder.setReason(message);
- }
- }
- row.update(key, index);
- }
- }
-
- @Override
- public void fillColumnCardinality(List<Integer> cardinality) {
- cardinality.add(Integer.MAX_VALUE);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
deleted file mode 100644
index 1aada19..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.processing.datatypes.ArrayDataType;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
-import org.apache.carbondata.processing.datatypes.StructDataType;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-
-public class FieldEncoderFactory {
-
- private static FieldEncoderFactory instance;
-
- private FieldEncoderFactory() {
-
- }
-
- public static FieldEncoderFactory getInstance() {
- if (instance == null) {
- instance = new FieldEncoderFactory();
- }
- return instance;
- }
-
- /**
- * Creates the FieldConverter for all dimensions, for measures return null.
- *
- * @param dataField column schema
- * @param cache dicionary cache.
- * @param carbonTableIdentifier table identifier
- * @param index index of column in the row.
- * @param isEmptyBadRecord
- * @return
- */
- public FieldConverter createFieldEncoder(DataField dataField,
- Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
- DictionaryClient client, Boolean useOnePass, String storePath,
- Map<Object, Integer> localCache, boolean isEmptyBadRecord)
- throws IOException {
- // Converters are only needed for dimensions and measures it return null.
- if (dataField.getColumn().isDimension()) {
- if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- !dataField.getColumn().isComplex()) {
- return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
- isEmptyBadRecord);
- } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
- !dataField.getColumn().isComplex()) {
- return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
- index, client, useOnePass, storePath, localCache, isEmptyBadRecord);
- } else if (dataField.getColumn().isComplex()) {
- return new ComplexFieldConverterImpl(
- createComplexType(dataField, cache, carbonTableIdentifier,
- client, useOnePass, storePath, localCache), index);
- } else {
- return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
- }
- } else {
- return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
- }
- }
-
- /**
- * Create parser for the carbon column.
- */
- private static GenericDataType createComplexType(DataField dataField,
- Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
- String storePath, Map<Object, Integer> localCache) {
- return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
- carbonTableIdentifier, client, useOnePass, storePath, localCache);
- }
-
- /**
- * This method may be called recursively if the carbon column is complex type.
- *
- * @return GenericDataType
- */
- private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
- Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
- String storePath, Map<Object, Integer> localCache) {
- switch (carbonColumn.getDataType()) {
- case ARRAY:
- List<CarbonDimension> listOfChildDimensions =
- ((CarbonDimension) carbonColumn).getListOfChildDimensions();
- // Create array parser with complex delimiter
- ArrayDataType arrayDataType =
- new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
- for (CarbonDimension dimension : listOfChildDimensions) {
- arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
- carbonTableIdentifier, client, useOnePass, storePath, localCache));
- }
- return arrayDataType;
- case STRUCT:
- List<CarbonDimension> dimensions =
- ((CarbonDimension) carbonColumn).getListOfChildDimensions();
- // Create struct parser with complex delimiter
- StructDataType structDataType =
- new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
- for (CarbonDimension dimension : dimensions) {
- structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
- carbonTableIdentifier, client, useOnePass, storePath, localCache));
- }
- return structDataType;
- case MAP:
- throw new UnsupportedOperationException("Complex type Map is not supported yet");
- default:
- return new PrimitiveDataType(carbonColumn.getColName(), parentName,
- carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
- carbonTableIdentifier, client, useOnePass, storePath, localCache);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
deleted file mode 100644
index 8e20b8f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * Converter for measure
- */
-public class MeasureFieldConverterImpl implements FieldConverter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(MeasureFieldConverterImpl.class.getName());
-
- private int index;
-
- private DataType dataType;
-
- private CarbonMeasure measure;
-
- private String nullformat;
-
- private boolean isEmptyBadRecord;
-
- public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
- boolean isEmptyBadRecord) {
- this.dataType = dataField.getColumn().getDataType();
- this.measure = (CarbonMeasure) dataField.getColumn();
- this.nullformat = nullformat;
- this.index = index;
- this.isEmptyBadRecord = isEmptyBadRecord;
- }
-
- @Override
- public void convert(CarbonRow row, BadRecordLogHolder logHolder)
- throws CarbonDataLoadingException {
- String value = row.getString(index);
- Object output;
- boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(value);
- if (value == null || isNull) {
- String message = logHolder.getColumnMessageMap().get(measure.getColName());
- if (null == message) {
- message = CarbonDataProcessorUtil
- .prepareFailureReason(measure.getColName(), measure.getDataType());
- logHolder.getColumnMessageMap().put(measure.getColName(), message);
- }
- row.update(null, index);
- } else if (value.length() == 0) {
- if (isEmptyBadRecord) {
- String message = logHolder.getColumnMessageMap().get(measure.getColName());
- if (null == message) {
- message = CarbonDataProcessorUtil
- .prepareFailureReason(measure.getColName(), measure.getDataType());
- logHolder.getColumnMessageMap().put(measure.getColName(), message);
- }
- logHolder.setReason(message);
- }
- row.update(null, index);
- } else if (value.equals(nullformat)) {
- row.update(null, index);
- } else {
- try {
- output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
- row.update(output, index);
- } catch (NumberFormatException e) {
- LOGGER.warn(
- "Cant not convert value to Numeric type value. Value considered as null.");
- logHolder.setReason(
- CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
- output = null;
- row.update(output, index);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
deleted file mode 100644
index 4861d78..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class NonDictionaryFieldConverterImpl implements FieldConverter {
-
- private DataType dataType;
-
- private int index;
-
- private String nullformat;
-
- private CarbonColumn column;
-
- private boolean isEmptyBadRecord;
-
- private DataField dataField;
-
- public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index,
- boolean isEmptyBadRecord) {
- this.dataField = dataField;
- this.dataType = dataField.getColumn().getDataType();
- this.column = dataField.getColumn();
- this.index = index;
- this.nullformat = nullformat;
- this.isEmptyBadRecord = isEmptyBadRecord;
- }
-
- @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
- String dimensionValue = row.getString(index);
- if (null == dimensionValue && column.getDataType() != DataType.STRING) {
- logHolder.setReason(
- CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
- updateWithNullValue(row);
- } else if (dimensionValue == null || dimensionValue.equals(nullformat)) {
- updateWithNullValue(row);
- } else {
- try {
- row.update(DataTypeUtil
- .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType,
- dataField.getDateFormat()), index);
- } catch (Throwable ex) {
- if (dimensionValue.length() > 0 || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
- String message = logHolder.getColumnMessageMap().get(column.getColName());
- if (null == message) {
- message = CarbonDataProcessorUtil
- .prepareFailureReason(column.getColName(), column.getDataType());
- logHolder.getColumnMessageMap().put(column.getColName(), message);
- }
- logHolder.setReason(message);
- updateWithNullValue(row);
- } else {
- updateWithNullValue(row);
- }
- }
- }
- }
-
- private void updateWithNullValue(CarbonRow row) {
- if (dataType == DataType.STRING) {
- row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
- } else {
- row.update(CarbonCommonConstants.EMPTY_BYTE_ARRAY, index);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
deleted file mode 100644
index eecb0e9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-import org.apache.carbondata.processing.newflow.converter.RowConverter;
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
-
-/**
- * It converts the complete row if necessary, dictionary columns are encoded with dictionary values
- * and nondictionary values are converted to binary.
- */
-public class RowConverterImpl implements RowConverter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(RowConverterImpl.class.getName());
-
- private CarbonDataLoadConfiguration configuration;
-
- private DataField[] fields;
-
- private FieldConverter[] fieldConverters;
-
- private BadRecordsLogger badRecordLogger;
-
- private BadRecordLogHolder logHolder;
-
- private List<DictionaryClient> dictClients = new ArrayList<>();
-
- private ExecutorService executorService;
-
- private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
-
- private Map<Object, Integer>[] localCaches;
-
- public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
- BadRecordsLogger badRecordLogger) {
- this.fields = fields;
- this.configuration = configuration;
- this.badRecordLogger = badRecordLogger;
- }
-
- @Override
- public void initialize() throws IOException {
- CacheProvider cacheProvider = CacheProvider.getInstance();
- cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
- configuration.getTableIdentifier().getStorePath());
- String nullFormat =
- configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
- .toString();
- boolean isEmptyBadRecord = Boolean.parseBoolean(
- configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
- .toString());
- List<FieldConverter> fieldConverterList = new ArrayList<>();
- localCaches = new Map[fields.length];
- long lruCacheStartTime = System.currentTimeMillis();
- DictionaryClient client = createDictionaryClient();
- dictClients.add(client);
-
- for (int i = 0; i < fields.length; i++) {
- localCaches[i] = new ConcurrentHashMap<>();
- FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
- .createFieldEncoder(fields[i], cache,
- configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
- configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
- localCaches[i], isEmptyBadRecord);
- fieldConverterList.add(fieldConverter);
- }
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
- fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
- logHolder = new BadRecordLogHolder();
- }
-
- private DictionaryClient createDictionaryClient() {
- // for one pass load, start the dictionary client
- if (configuration.getUseOnePass()) {
- if (executorService == null) {
- executorService = Executors.newCachedThreadPool();
- }
- Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>() {
- @Override
- public DictionaryClient call() throws Exception {
- Thread.currentThread().setName("Dictionary client");
- DictionaryClient dictionaryClient = new DictionaryClient();
- dictionaryClient.startClient(configuration.getDictionaryServerHost(),
- configuration.getDictionaryServerPort());
- return dictionaryClient;
- }
- });
-
- try {
- // wait for client initialization finished, or will raise null pointer exception
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOGGER.error(e);
- throw new RuntimeException(e);
- }
-
- try {
- return result.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
- return null;
- }
-
- @Override
- public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
- //TODO: only copy if it is bad record
- CarbonRow copy = row.getCopy();
- logHolder.setLogged(false);
- logHolder.clear();
- for (int i = 0; i < fieldConverters.length; i++) {
- fieldConverters[i].convert(row, logHolder);
- if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
- badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
- if (badRecordLogger.isDataLoadFail()) {
- String error = "Data load failed due to bad record: " + logHolder.getReason() +
- "Please enable bad record logger to know the detail reason.";
- throw new BadRecordFoundException(error);
- }
- logHolder.clear();
- logHolder.setLogged(true);
- if (badRecordLogger.isBadRecordConvertNullDisable()) {
- return null;
- }
- }
- }
- return row;
- }
-
- @Override
- public void finish() {
- // close dictionary client when finish write
- if (configuration.getUseOnePass()) {
- for (DictionaryClient client : dictClients) {
- if (client != null) {
- client.shutDown();
- }
- }
- if (null != logHolder) {
- logHolder.finish();
- }
- if (executorService != null) {
- executorService.shutdownNow();
- executorService = null;
- }
- }
- }
-
- @Override
- public RowConverter createCopyForNewThread() {
- RowConverterImpl converter =
- new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger);
- List<FieldConverter> fieldConverterList = new ArrayList<>();
- DictionaryClient client = createDictionaryClient();
- dictClients.add(client);
- String nullFormat =
- configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
- .toString();
- boolean isEmptyBadRecord = Boolean.parseBoolean(
- configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
- .toString());
- for (int i = 0; i < fields.length; i++) {
- FieldConverter fieldConverter = null;
- try {
- fieldConverter = FieldEncoderFactory.getInstance().createFieldEncoder(fields[i], cache,
- configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
- configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
- localCaches[i], isEmptyBadRecord);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- fieldConverterList.add(fieldConverter);
- }
- converter.fieldConverters =
- fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
- converter.logHolder = new BadRecordLogHolder();
- return converter;
- }
-
- @Override public int[] getCardinality() {
- List<Integer> dimCardinality = new ArrayList<>();
- if (fieldConverters != null) {
- for (int i = 0; i < fieldConverters.length; i++) {
- if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
- ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
- .fillColumnCardinality(dimCardinality);
- }
- }
- }
- int[] cardinality = new int[dimCardinality.size()];
- for (int i = 0; i < dimCardinality.size(); i++) {
- cardinality[i] = dimCardinality.get(i);
- }
- return cardinality;
- }
-}