You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:20 UTC
[33/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
new file mode 100644
index 0000000..1e73867
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * Replace row data fields with dictionary values if column is configured dictionary encoded.
+ * And nondictionary columns as well as complex columns will be converted to byte[].
+ */
+public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+ private List<RowConverter> converters;
+ private BadRecordsLogger badRecordLogger;
+
+ public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+ AbstractDataLoadProcessorStep child) {
+ super(configuration, child);
+ }
+
+ @Override
+ public DataField[] getOutput() {
+ return child.getOutput();
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ super.initialize();
+ child.initialize();
+ converters = new ArrayList<>();
+ badRecordLogger = createBadRecordLogger(configuration);
+ RowConverter converter =
+ new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+ configuration.setCardinalityFinder(converter);
+ converters.add(converter);
+ converter.initialize();
+ }
+
+ /**
+ * Create the iterator using child iterator.
+ *
+ * @param childIter
+ * @return new iterator with step specific processing.
+ */
+ @Override
+ protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+ return new CarbonIterator<CarbonRowBatch>() {
+ private boolean first = true;
+ private RowConverter localConverter;
+ @Override public boolean hasNext() {
+ if (first) {
+ first = false;
+ localConverter = converters.get(0).createCopyForNewThread();
+ synchronized (converters) {
+ converters.add(localConverter);
+ }
+ }
+ return childIter.hasNext();
+ }
+ @Override public CarbonRowBatch next() {
+ return processRowBatch(childIter.next(), localConverter);
+ }
+ };
+ }
+
+ /**
+ * Process the batch of rows as per the step logic.
+ *
+ * @param rowBatch
+ * @return processed row.
+ */
+ protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
+ CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+ while (rowBatch.hasNext()) {
+ newBatch.addRow(localConverter.convert(rowBatch.next()));
+ }
+ rowCounter.getAndAdd(newBatch.getSize());
+ return newBatch;
+ }
+
+ @Override
+ protected CarbonRow processRow(CarbonRow row) {
+ throw new UnsupportedOperationException();
+ }
+
+ public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration) {
+ boolean badRecordsLogRedirect = false;
+ boolean badRecordConvertNullDisable = false;
+ boolean isDataLoadFail = false;
+ boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+ .toString());
+ Object bad_records_action =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+ .toString();
+ if (null != bad_records_action) {
+ LoggerAction loggerAction = null;
+ try {
+ loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ loggerAction = LoggerAction.FORCE;
+ }
+ switch (loggerAction) {
+ case FORCE:
+ badRecordConvertNullDisable = false;
+ break;
+ case REDIRECT:
+ badRecordsLogRedirect = true;
+ badRecordConvertNullDisable = true;
+ break;
+ case IGNORE:
+ badRecordsLogRedirect = false;
+ badRecordConvertNullDisable = true;
+ break;
+ case FAIL:
+ isDataLoadFail = true;
+ break;
+ }
+ }
+ CarbonTableIdentifier identifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
+ identifier.getTableName() + '_' + System.currentTimeMillis(),
+ getBadLogStoreLocation(configuration,
+ identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration
+ .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
+ badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
+ }
+
+ public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration,
+ String storeLocation) {
+ String badLogStoreLocation = (String) configuration
+ .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+ if (null == badLogStoreLocation) {
+ badLogStoreLocation =
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+ }
+ badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+ return badLogStoreLocation;
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ if (null != badRecordLogger) {
+ badRecordLogger.closeStreams();
+ renameBadRecord(badRecordLogger, configuration);
+ }
+ super.close();
+ if (converters != null) {
+ for (RowConverter converter : converters) {
+ if (null != converter) {
+ converter.finish();
+ }
+ }
+ }
+ }
+ }
+
+ public static void close(BadRecordsLogger badRecordLogger, CarbonDataLoadConfiguration
+ configuration, RowConverter converter) {
+ if (badRecordLogger != null) {
+ badRecordLogger.closeStreams();
+ renameBadRecord(badRecordLogger, configuration);
+ }
+ if (converter != null) {
+ converter.finish();
+ }
+ }
+
+ private static void renameBadRecord(BadRecordsLogger badRecordLogger,
+ CarbonDataLoadConfiguration configuration) {
+ // rename operation should be performed only in case either bad reccords loggers is enabled
+ // or bad records redirect is enabled
+ if (badRecordLogger.isBadRecordLoggerEnable() || badRecordLogger.isBadRecordsLogRedirect()) {
+ // rename the bad record in progress to normal
+ CarbonTableIdentifier identifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
+ identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
+ + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo());
+ }
+ }
+
+ @Override protected String getStepName() {
+ return "Data Converter";
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
new file mode 100644
index 0000000..009c6a0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.partition.Partitioner;
+import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * Replace row data fields with dictionary values if column is configured dictionary encoded.
+ * And nondictionary columns as well as complex columns will be converted to byte[].
+ */
+public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep {
+
+ private List<RowConverter> converters;
+
+ private Partitioner<Object[]> partitioner;
+
+ private BadRecordsLogger badRecordLogger;
+
+ public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration,
+ AbstractDataLoadProcessorStep child) {
+ super(configuration, child);
+ }
+
+ @Override
+ public DataField[] getOutput() {
+ return child.getOutput();
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ super.initialize();
+ child.initialize();
+ converters = new ArrayList<>();
+ badRecordLogger = createBadRecordLogger();
+ RowConverter converter =
+ new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+ configuration.setCardinalityFinder(converter);
+ converters.add(converter);
+ converter.initialize();
+ List<Integer> indexes = new ArrayList<>();
+ List<ColumnSchema> columnSchemas = new ArrayList<>();
+ DataField[] inputDataFields = getOutput();
+ BucketingInfo bucketingInfo = configuration.getBucketingInfo();
+ for (int i = 0; i < inputDataFields.length; i++) {
+ for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) {
+ if (inputDataFields[i].getColumn().getColName()
+ .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) {
+ indexes.add(i);
+ columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema());
+ break;
+ }
+ }
+ }
+ partitioner =
+ new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumberOfBuckets());
+ }
+
+ /**
+ * Create the iterator using child iterator.
+ *
+ * @param childIter
+ * @return new iterator with step specific processing.
+ */
+ @Override
+ protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+ return new CarbonIterator<CarbonRowBatch>() {
+ RowConverter localConverter;
+ private boolean first = true;
+ @Override public boolean hasNext() {
+ if (first) {
+ first = false;
+ localConverter = converters.get(0).createCopyForNewThread();
+ converters.add(localConverter);
+ }
+ return childIter.hasNext();
+ }
+
+ @Override public CarbonRowBatch next() {
+ return processRowBatch(childIter.next(), localConverter);
+ }
+ };
+ }
+
+ /**
+ * Process the batch of rows as per the step logic.
+ *
+ * @param rowBatch
+ * @return processed row.
+ */
+ protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
+ CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+ while (rowBatch.hasNext()) {
+ CarbonRow next = rowBatch.next();
+ short bucketNumber = (short) partitioner.getPartition(next.getData());
+ CarbonRow convertRow = localConverter.convert(next);
+ convertRow.bucketNumber = bucketNumber;
+ newBatch.addRow(convertRow);
+ }
+ rowCounter.getAndAdd(newBatch.getSize());
+ return newBatch;
+ }
+
+ @Override
+ protected CarbonRow processRow(CarbonRow row) {
+ throw new UnsupportedOperationException();
+ }
+
+ private BadRecordsLogger createBadRecordLogger() {
+ boolean badRecordsLogRedirect = false;
+ boolean badRecordConvertNullDisable = false;
+ boolean isDataLoadFail = false;
+ boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+ .toString());
+ Object bad_records_action =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+ .toString();
+ if (null != bad_records_action) {
+ LoggerAction loggerAction = null;
+ try {
+ loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ loggerAction = LoggerAction.FORCE;
+ }
+ switch (loggerAction) {
+ case FORCE:
+ badRecordConvertNullDisable = false;
+ break;
+ case REDIRECT:
+ badRecordsLogRedirect = true;
+ badRecordConvertNullDisable = true;
+ break;
+ case IGNORE:
+ badRecordsLogRedirect = false;
+ badRecordConvertNullDisable = true;
+ break;
+ case FAIL:
+ isDataLoadFail = true;
+ break;
+ }
+ }
+ CarbonTableIdentifier identifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
+ identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
+ identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
+ + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
+ badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
+ }
+
+ private String getBadLogStoreLocation(String storeLocation) {
+ String badLogStoreLocation = (String) configuration
+ .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+ if (null == badLogStoreLocation) {
+ badLogStoreLocation =
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+ }
+ badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+ return badLogStoreLocation;
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ super.close();
+ if (null != badRecordLogger) {
+ badRecordLogger.closeStreams();
+ renameBadRecord(configuration);
+ }
+ if (converters != null) {
+ for (RowConverter converter : converters) {
+ converter.finish();
+ }
+ }
+ }
+ }
+ private static void renameBadRecord(CarbonDataLoadConfiguration configuration) {
+ // rename the bad record in progress to normal
+ CarbonTableIdentifier identifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
+ identifier.getDatabaseName() + File.separator + identifier.getTableName()
+ + File.separator + configuration.getSegmentId() + File.separator + configuration
+ .getTaskNo());
+ }
+ @Override protected String getStepName() {
+ return "Data Converter with Bucketing";
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
new file mode 100644
index 0000000..f030d52
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from batch of sorted files(it could be in-memory/disk based files)
+ * which are generated in previous sort step. And it writes data to carbondata file.
+ * It also generates mdk key while writing to carbondata file
+ */
+public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
+
+ public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+ AbstractDataLoadProcessorStep child) {
+ super(configuration, child);
+ }
+
+ @Override public DataField[] getOutput() {
+ return child.getOutput();
+ }
+
+ @Override public void initialize() throws IOException {
+ super.initialize();
+ child.initialize();
+ }
+
+ private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
+ String[] storeLocation = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+ tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
+ configuration.getSegmentId() + "", false, false);
+ CarbonDataProcessorUtil.createLocations(storeLocation);
+ return storeLocation;
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+ Iterator<CarbonRowBatch>[] iterators = child.execute();
+ CarbonTableIdentifier tableIdentifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ String tableName = tableIdentifier.getTableName();
+ try {
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ System.currentTimeMillis());
+ int i = 0;
+ for (Iterator<CarbonRowBatch> iterator : iterators) {
+ String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
+ int k = 0;
+ while (iterator.hasNext()) {
+ CarbonRowBatch next = iterator.next();
+ // If no rows from merge sorter, then don't create a file in fact column handler
+ if (next.hasNext()) {
+ CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+ .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
+ CarbonFactHandler dataHandler = CarbonFactHandlerFactory
+ .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+ dataHandler.initialise();
+ processBatch(next, dataHandler);
+ finish(tableName, dataHandler);
+ }
+ }
+ i++;
+ }
+ } catch (Exception e) {
+ LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterBatchProcessorStepImpl");
+ if (e.getCause() instanceof BadRecordFoundException) {
+ throw new BadRecordFoundException(e.getCause().getMessage());
+ }
+ throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+ }
+ return null;
+ }
+
+ @Override protected String getStepName() {
+ return "Data Batch Writer";
+ }
+
+ private void finish(String tableName, CarbonFactHandler dataHandler) {
+ try {
+ dataHandler.finish();
+ } catch (Exception e) {
+ LOGGER.error(e, "Failed for table: " + tableName + " in finishing data handler");
+ }
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
+ processingComplete(dataHandler);
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+ }
+
+ private void processingComplete(CarbonFactHandler dataHandler) {
+ if (null != dataHandler) {
+ try {
+ dataHandler.closeHandler();
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new CarbonDataLoadingException(
+ "There is an unexpected error while closing data handler", e);
+ }
+ }
+ }
+
+ private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) throws Exception {
+ int batchSize = 0;
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ dataHandler.addDataToStore(row);
+ batchSize++;
+ }
+ batch.close();
+ rowCounter.getAndAdd(batchSize);
+ }
+
+ @Override protected CarbonRow processRow(CarbonRow row) {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
new file mode 100644
index 0000000..e8e2b0e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from sorted files which are generated in previous sort step.
+ * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
+ */
+public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName());
+
+ private long readCounter;
+
+ public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+ AbstractDataLoadProcessorStep child) {
+ super(configuration, child);
+ }
+
+ public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration) {
+ super(configuration, null);
+ }
+
+ @Override public DataField[] getOutput() {
+ return child.getOutput();
+ }
+
+ @Override public void initialize() throws IOException {
+ super.initialize();
+ child.initialize();
+ }
+
+ private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
+ String[] storeLocation = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+ tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
+ configuration.getSegmentId() + "", false, false);
+ CarbonDataProcessorUtil.createLocations(storeLocation);
+ return storeLocation;
+ }
+
+ public CarbonFactDataHandlerModel getDataHandlerModel(int partitionId) {
+ CarbonTableIdentifier tableIdentifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
+ CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+ .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, 0);
+ return model;
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+ Iterator<CarbonRowBatch>[] iterators = child.execute();
+ CarbonTableIdentifier tableIdentifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ String tableName = tableIdentifier.getTableName();
+ try {
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ System.currentTimeMillis());
+ int i = 0;
+ for (Iterator<CarbonRowBatch> iterator : iterators) {
+ String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
+
+ CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+ .createCarbonFactDataHandlerModel(configuration, storeLocation, i, 0);
+ CarbonFactHandler dataHandler = null;
+ boolean rowsNotExist = true;
+ while (iterator.hasNext()) {
+ if (rowsNotExist) {
+ rowsNotExist = false;
+ dataHandler = CarbonFactHandlerFactory
+ .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+ dataHandler.initialise();
+ }
+ processBatch(iterator.next(), dataHandler);
+ }
+ if (!rowsNotExist) {
+ finish(dataHandler);
+ }
+ i++;
+ }
+
+ } catch (CarbonDataWriterException e) {
+ LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
+ throw new CarbonDataLoadingException(
+ "Error while initializing data handler : " + e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
+ throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
+ }
+ return null;
+ }
+
+ @Override protected String getStepName() {
+ return "Data Writer";
+ }
+
+ public void finish(CarbonFactHandler dataHandler) {
+ CarbonTableIdentifier tableIdentifier =
+ configuration.getTableIdentifier().getCarbonTableIdentifier();
+ String tableName = tableIdentifier.getTableName();
+
+ try {
+ dataHandler.finish();
+ } catch (Exception e) {
+ LOGGER.error(e, "Failed for table: " + tableName + " in finishing data handler");
+ }
+ LOGGER.info("Record Processed For table: " + tableName);
+ String logMessage =
+ "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
+ + rowCounter.get();
+ LOGGER.info(logMessage);
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
+ processingComplete(dataHandler);
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ System.currentTimeMillis());
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+ }
+
+ private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
+ if (null != dataHandler) {
+ try {
+ dataHandler.closeHandler();
+ } catch (CarbonDataWriterException e) {
+ LOGGER.error(e, e.getMessage());
+ throw new CarbonDataLoadingException(e.getMessage(), e);
+ } catch (Exception e) {
+ LOGGER.error(e, e.getMessage());
+ throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+ }
+ }
+ }
+
+ private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
+ throws CarbonDataLoadingException {
+ try {
+ while (batch.hasNext()) {
+ CarbonRow row = batch.next();
+ dataHandler.addDataToStore(row);
+ readCounter++;
+ }
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ rowCounter.getAndAdd(batch.getSize());
+ }
+
+ public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyGenException {
+ try {
+ readCounter++;
+ dataHandler.addDataToStore(row);
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException("unable to generate the mdkey", e);
+ }
+ rowCounter.getAndAdd(1);
+ }
+
+ @Override protected CarbonRow processRow(CarbonRow row) {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
new file mode 100644
index 0000000..70a1254
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+ private RowParser rowParser;
+
+ private CarbonIterator<Object[]>[] inputIterators;
+
+ /**
+ * executor service to execute the query
+ */
+ public ExecutorService executorService;
+
+ public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+ CarbonIterator<Object[]>[] inputIterators) {
+ super(configuration, null);
+ this.inputIterators = inputIterators;
+ }
+
+ @Override public DataField[] getOutput() {
+ return configuration.getDataFields();
+ }
+
+ @Override public void initialize() throws IOException {
+ super.initialize();
+ rowParser = new RowParserImpl(getOutput(), configuration);
+ executorService = Executors.newCachedThreadPool();
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] execute() {
+ int batchSize = CarbonProperties.getInstance().getBatchSize();
+ List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+ Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
+ for (int i = 0; i < outIterators.length; i++) {
+ outIterators[i] =
+ new InputProcessorIterator(readerIterators[i], rowParser, batchSize,
+ configuration.isPreFetch(), executorService, rowCounter);
+ }
+ return outIterators;
+ }
+
+ /**
+ * Partition input iterators equally as per the number of threads.
+ * @return
+ */
+ private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
+ // Get the number of cores configured in property.
+ int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+ // Get the minimum of number of cores and iterators size to get the number of parallel threads
+ // to be launched.
+ int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
+
+ List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+ for (int i = 0; i < parallelThreadNumber; i++) {
+ iterators[i] = new ArrayList<>();
+ }
+ // Equally partition the iterators as per number of threads
+ for (int i = 0; i < inputIterators.length; i++) {
+ iterators[i % parallelThreadNumber].add(inputIterators[i]);
+ }
+ return iterators;
+ }
+
+ @Override protected CarbonRow processRow(CarbonRow row) {
+ return null;
+ }
+
+ @Override public void close() {
+ if (!closed) {
+ super.close();
+ executorService.shutdown();
+ for (CarbonIterator inputIterator : inputIterators) {
+ inputIterator.close();
+ }
+ }
+ }
+
+ @Override protected String getStepName() {
+ return "Input Processor";
+ }
+
+ /**
+ * This iterator wraps the list of iterators and it starts iterating the each
+ * iterator of the list one by one. It also parse the data while iterating it.
+ */
+ private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
+
+ private List<CarbonIterator<Object[]>> inputIterators;
+
+ private CarbonIterator<Object[]> currentIterator;
+
+ private int counter;
+
+ private int batchSize;
+
+ private RowParser rowParser;
+
+ private Future<CarbonRowBatch> future;
+
+ private ExecutorService executorService;
+
+ private boolean nextBatch;
+
+ private boolean firstTime;
+
+ private boolean preFetch;
+
+ private AtomicLong rowCounter;
+
+ public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators,
+ RowParser rowParser, int batchSize, boolean preFetch, ExecutorService executorService,
+ AtomicLong rowCounter) {
+ this.inputIterators = inputIterators;
+ this.batchSize = batchSize;
+ this.rowParser = rowParser;
+ this.counter = 0;
+ // Get the first iterator from the list.
+ currentIterator = inputIterators.get(counter++);
+ this.executorService = executorService;
+ this.rowCounter = rowCounter;
+ this.preFetch = preFetch;
+ this.nextBatch = false;
+ this.firstTime = true;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextBatch || internalHasNext();
+ }
+
+ private boolean internalHasNext() {
+ if (firstTime) {
+ firstTime = false;
+ currentIterator.initialize();
+ }
+ boolean hasNext = currentIterator.hasNext();
+ // If iterator is finished then check for next iterator.
+ if (!hasNext) {
+ currentIterator.close();
+ // Check next iterator is available in the list.
+ if (counter < inputIterators.size()) {
+ // Get the next iterator from the list.
+ currentIterator = inputIterators.get(counter++);
+ // Initialize the new iterator
+ currentIterator.initialize();
+ hasNext = internalHasNext();
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public CarbonRowBatch next() {
+ if (preFetch) {
+ return getCarbonRowBatchWithPreFetch();
+ } else {
+ return getBatch();
+ }
+ }
+
+ private CarbonRowBatch getCarbonRowBatchWithPreFetch() {
+ CarbonRowBatch result = null;
+ if (future == null) {
+ future = getCarbonRowBatch();
+ }
+ try {
+ result = future.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ nextBatch = false;
+ if (hasNext()) {
+ nextBatch = true;
+ future = getCarbonRowBatch();
+ }
+
+ return result;
+ }
+
+ private Future<CarbonRowBatch> getCarbonRowBatch() {
+ return executorService.submit(new Callable<CarbonRowBatch>() {
+ @Override public CarbonRowBatch call() throws Exception {
+ return getBatch();
+
+ }
+ });
+ }
+
+ private CarbonRowBatch getBatch() {
+ // Create batch and fill it.
+ CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
+ int count = 0;
+ while (internalHasNext() && count < batchSize) {
+ carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
+ count++;
+ }
+ rowCounter.getAndAdd(carbonRowBatch.getSize());
+ return carbonRowBatch;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java
new file mode 100644
index 0000000..856d68c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.Sorter;
+import org.apache.carbondata.processing.loading.sort.SorterFactory;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+/**
+ * It sorts the data and write them to intermediate temp files. These files will be further read
+ * by next step for writing to carbondata files.
+ */
+public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+ private Sorter sorter;
+
+ public SortProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+ AbstractDataLoadProcessorStep child) {
+ super(configuration, child);
+ }
+
+ @Override
+ public DataField[] getOutput() {
+ return child.getOutput();
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ super.initialize();
+ child.initialize();
+ SortParameters sortParameters = SortParameters.createSortParameters(configuration);
+ sorter = SorterFactory.createSorter(configuration, rowCounter);
+ sorter.initialize(sortParameters);
+ }
+
+ @Override
+ public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+ final Iterator<CarbonRowBatch>[] iterators = child.execute();
+ return sorter.sort(iterators);
+ }
+
+ @Override
+ protected CarbonRow processRow(CarbonRow row) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ super.close();
+ if (sorter != null) {
+ sorter.close();
+ }
+ }
+ }
+
+ @Override protected String getStepName() {
+ return "Sort Processor";
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index f76c66f..aa77fb6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -23,7 +23,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 838e5be..8631e1d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -52,7 +52,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
/**
* utility class for load merging.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 095e5a3..edffae9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -33,15 +33,15 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
import org.apache.carbondata.processing.store.CarbonFactHandler;
import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
-import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.spark.sql.types.Decimal;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index ef53163..f82f365 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -34,8 +34,8 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.exception.SliceMergerException;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
import org.apache.carbondata.processing.store.CarbonFactHandler;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java b/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
deleted file mode 100644
index 3ae3604..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.merger.exeception;
-
-import java.util.Locale;
-
-public class SliceMergerException extends Exception {
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public SliceMergerException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public SliceMergerException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * This method is used to get the localized message.
- *
- * @param locale - A Locale object represents a specific geographical,
- * political, or cultural region.
- * @return - Localized error message.
- */
- public String getLocalizedMessage(Locale locale) {
- return "";
- }
-
- /**
- * getLocalizedMessage
- */
- @Override public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
deleted file mode 100644
index cbdd7b4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.model;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-/**
- * Wrapper Data Load Schema object which will be used to
- * support relation while data loading
- */
-public class CarbonDataLoadSchema implements Serializable {
-
- /**
- * default serializer
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * CarbonTable info
- */
- private CarbonTable carbonTable;
-
- /**
- * CarbonDataLoadSchema constructor which takes CarbonTable
- *
- * @param carbonTable
- */
- public CarbonDataLoadSchema(CarbonTable carbonTable) {
- this.carbonTable = carbonTable;
- }
-
- /**
- * get carbontable
- *
- * @return carbonTable
- */
- public CarbonTable getCarbonTable() {
- return carbonTable;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
deleted file mode 100644
index be2c8a5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ /dev/null
@@ -1,764 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.model;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-
-public class CarbonLoadModel implements Serializable {
-
- private static final long serialVersionUID = 6580168429197697465L;
-
- private String databaseName;
-
- private String tableName;
-
- private String factFilePath;
-
- private String colDictFilePath;
-
- private String partitionId;
-
- private CarbonDataLoadSchema carbonDataLoadSchema;
-
- private boolean aggLoadRequest;
-
- private String storePath;
-
- private boolean isRetentionRequest;
-
- private String csvHeader;
- private String[] csvHeaderColumns;
- private String csvDelimiter;
- private String complexDelimiterLevel1;
- private String complexDelimiterLevel2;
-
- private boolean isDirectLoad;
- private List<LoadMetadataDetails> loadMetadataDetails;
- private transient SegmentUpdateStatusManager segmentUpdateStatusManager;
-
- private String blocksID;
-
- /**
- * Map from carbon dimension to pre defined dict file path
- */
- private HashMap<CarbonDimension, String> predefDictMap;
-
- /**
- * task id, each spark task has a unique id
- */
- private String taskNo;
- /**
- * new load start time
- */
- private long factTimeStamp;
- /**
- * load Id
- */
- private String segmentId;
-
- private String allDictPath;
-
- /**
- * escape Char
- */
- private String escapeChar;
-
- /**
- * quote Char
- */
- private String quoteChar;
-
- /**
- * comment Char
- */
- private String commentChar;
-
- private String dateFormat;
-
- private String defaultTimestampFormat;
-
- private String defaultDateFormat;
-
- /**
- * defines the string that should be treated as null while loadind data
- */
- private String serializationNullFormat;
-
- /**
- * defines the string to specify whether the bad record logger should be enabled or not
- */
- private String badRecordsLoggerEnable;
-
- /**
- * defines the option to specify the bad record logger action
- */
- private String badRecordsAction;
-
- /**
- * Max number of columns that needs to be parsed by univocity parser
- */
- private String maxColumns;
-
- /**
- * defines the string to specify whether empty data is good or bad
- */
- private String isEmptyDataBadRecord;
-
- /**
- * Use one pass to generate dictionary
- */
- private boolean useOnePass;
-
- /**
- * dictionary server host
- */
- private String dictionaryServerHost;
-
- /**
- * dictionary sever port
- */
- private int dictionaryServerPort;
-
- /**
- * Pre fetch data from csv reader
- */
- private boolean preFetch;
-
- /**
- * Batch sort should be enabled or not
- */
- private String sortScope;
-
- /**
- * Batch sort size in mb.
- */
- private String batchSortSizeInMb;
- /**
- * bad record location
- */
- private String badRecordsLocation;
-
- /**
- * Number of partitions in global sort.
- */
- private String globalSortPartitions;
-
- /**
- * get escape char
- *
- * @return
- */
- public String getEscapeChar() {
- return escapeChar;
- }
-
- /**
- * set escape char
- *
- * @param escapeChar
- */
- public void setEscapeChar(String escapeChar) {
- this.escapeChar = escapeChar;
- }
-
- public String getCsvDelimiter() {
- return csvDelimiter;
- }
-
- public void setCsvDelimiter(String csvDelimiter) {
- this.csvDelimiter = csvDelimiter;
- }
-
- public String getComplexDelimiterLevel1() {
- return complexDelimiterLevel1;
- }
-
- public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
- this.complexDelimiterLevel1 = complexDelimiterLevel1;
- }
-
- public String getComplexDelimiterLevel2() {
- return complexDelimiterLevel2;
- }
-
- public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
- this.complexDelimiterLevel2 = complexDelimiterLevel2;
- }
-
- public boolean isDirectLoad() {
- return isDirectLoad;
- }
-
- public void setDirectLoad(boolean isDirectLoad) {
- this.isDirectLoad = isDirectLoad;
- }
-
- public String getAllDictPath() {
- return allDictPath;
- }
-
- public void setAllDictPath(String allDictPath) {
- this.allDictPath = allDictPath;
- }
-
- public String getCsvHeader() {
- return csvHeader;
- }
-
- public void setCsvHeader(String csvHeader) {
- this.csvHeader = csvHeader;
- }
-
- public String[] getCsvHeaderColumns() {
- return csvHeaderColumns;
- }
-
- public void setCsvHeaderColumns(String[] csvHeaderColumns) {
- this.csvHeaderColumns = csvHeaderColumns;
- }
-
- public void initPredefDictMap() {
- predefDictMap = new HashMap<>();
- }
-
- public String getPredefDictFilePath(CarbonDimension dimension) {
- return predefDictMap.get(dimension);
- }
-
- public void setPredefDictMap(CarbonDimension dimension, String predefDictFilePath) {
- this.predefDictMap.put(dimension, predefDictFilePath);
- }
-
- /**
- * @return carbon dataload schema
- */
- public CarbonDataLoadSchema getCarbonDataLoadSchema() {
- return carbonDataLoadSchema;
- }
-
- /**
- * @param carbonDataLoadSchema
- */
- public void setCarbonDataLoadSchema(CarbonDataLoadSchema carbonDataLoadSchema) {
- this.carbonDataLoadSchema = carbonDataLoadSchema;
- }
-
- /**
- * @return the databaseName
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * @param databaseName the databaseName to set
- */
- public void setDatabaseName(String databaseName) {
- this.databaseName = databaseName;
- }
-
- /**
- * @return the tableName
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * @param tableName the tableName to set
- */
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- /**
- * @return the factFilePath
- */
- public String getFactFilePath() {
- return factFilePath;
- }
-
- /**
- * @param factFilePath the factFilePath to set
- */
- public void setFactFilePath(String factFilePath) {
- this.factFilePath = factFilePath;
- }
-
- /**
- * @return external column dictionary file path
- */
- public String getColDictFilePath() {
- return colDictFilePath;
- }
-
- /**
- * set external column dictionary file path
- *
- * @param colDictFilePath
- */
- public void setColDictFilePath(String colDictFilePath) {
- this.colDictFilePath = colDictFilePath;
- }
-
- /**
- * get copy with partition
- *
- * @param uniqueId
- * @return
- */
- public CarbonLoadModel getCopyWithPartition(String uniqueId) {
- CarbonLoadModel copy = new CarbonLoadModel();
- copy.tableName = tableName;
- copy.factFilePath = factFilePath + '/' + uniqueId;
- copy.databaseName = databaseName;
- copy.partitionId = uniqueId;
- copy.aggLoadRequest = aggLoadRequest;
- copy.loadMetadataDetails = loadMetadataDetails;
- copy.isRetentionRequest = isRetentionRequest;
- copy.complexDelimiterLevel1 = complexDelimiterLevel1;
- copy.complexDelimiterLevel2 = complexDelimiterLevel2;
- copy.carbonDataLoadSchema = carbonDataLoadSchema;
- copy.blocksID = blocksID;
- copy.taskNo = taskNo;
- copy.factTimeStamp = factTimeStamp;
- copy.segmentId = segmentId;
- copy.serializationNullFormat = serializationNullFormat;
- copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
- copy.badRecordsAction = badRecordsAction;
- copy.escapeChar = escapeChar;
- copy.quoteChar = quoteChar;
- copy.commentChar = commentChar;
- copy.dateFormat = dateFormat;
- copy.defaultTimestampFormat = defaultTimestampFormat;
- copy.maxColumns = maxColumns;
- copy.storePath = storePath;
- copy.useOnePass = useOnePass;
- copy.dictionaryServerHost = dictionaryServerHost;
- copy.dictionaryServerPort = dictionaryServerPort;
- copy.preFetch = preFetch;
- copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
- copy.sortScope = sortScope;
- copy.batchSortSizeInMb = batchSortSizeInMb;
- copy.badRecordsLocation = badRecordsLocation;
- return copy;
- }
-
- /**
- * Get copy with taskNo.
- * Broadcast value is shared in process, so we need to copy it to make sure the value in each
- * task independently.
- *
- * @return
- */
- public CarbonLoadModel getCopyWithTaskNo(String taskNo) {
- CarbonLoadModel copy = new CarbonLoadModel();
- copy.tableName = tableName;
- copy.factFilePath = factFilePath;
- copy.databaseName = databaseName;
- copy.partitionId = partitionId;
- copy.aggLoadRequest = aggLoadRequest;
- copy.loadMetadataDetails = loadMetadataDetails;
- copy.isRetentionRequest = isRetentionRequest;
- copy.csvHeader = csvHeader;
- copy.csvHeaderColumns = csvHeaderColumns;
- copy.isDirectLoad = isDirectLoad;
- copy.csvDelimiter = csvDelimiter;
- copy.complexDelimiterLevel1 = complexDelimiterLevel1;
- copy.complexDelimiterLevel2 = complexDelimiterLevel2;
- copy.carbonDataLoadSchema = carbonDataLoadSchema;
- copy.blocksID = blocksID;
- copy.taskNo = taskNo;
- copy.factTimeStamp = factTimeStamp;
- copy.segmentId = segmentId;
- copy.serializationNullFormat = serializationNullFormat;
- copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
- copy.badRecordsAction = badRecordsAction;
- copy.escapeChar = escapeChar;
- copy.quoteChar = quoteChar;
- copy.commentChar = commentChar;
- copy.dateFormat = dateFormat;
- copy.defaultTimestampFormat = defaultTimestampFormat;
- copy.maxColumns = maxColumns;
- copy.storePath = storePath;
- copy.useOnePass = useOnePass;
- copy.dictionaryServerHost = dictionaryServerHost;
- copy.dictionaryServerPort = dictionaryServerPort;
- copy.preFetch = preFetch;
- copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
- copy.sortScope = sortScope;
- copy.batchSortSizeInMb = batchSortSizeInMb;
- return copy;
- }
-
- /**
- * get CarbonLoadModel with partition
- *
- * @param uniqueId
- * @param filesForPartition
- * @param header
- * @param delimiter
- * @return
- */
- public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
- String header, String delimiter) {
- CarbonLoadModel copyObj = new CarbonLoadModel();
- copyObj.tableName = tableName;
- copyObj.factFilePath = null;
- copyObj.databaseName = databaseName;
- copyObj.partitionId = uniqueId;
- copyObj.aggLoadRequest = aggLoadRequest;
- copyObj.loadMetadataDetails = loadMetadataDetails;
- copyObj.isRetentionRequest = isRetentionRequest;
- copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
- copyObj.csvHeader = header;
- copyObj.csvHeaderColumns = csvHeaderColumns;
- copyObj.isDirectLoad = true;
- copyObj.csvDelimiter = delimiter;
- copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
- copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
- copyObj.blocksID = blocksID;
- copyObj.taskNo = taskNo;
- copyObj.factTimeStamp = factTimeStamp;
- copyObj.segmentId = segmentId;
- copyObj.serializationNullFormat = serializationNullFormat;
- copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
- copyObj.badRecordsAction = badRecordsAction;
- copyObj.escapeChar = escapeChar;
- copyObj.quoteChar = quoteChar;
- copyObj.commentChar = commentChar;
- copyObj.dateFormat = dateFormat;
- copyObj.defaultTimestampFormat = defaultTimestampFormat;
- copyObj.maxColumns = maxColumns;
- copyObj.storePath = storePath;
- copyObj.useOnePass = useOnePass;
- copyObj.dictionaryServerHost = dictionaryServerHost;
- copyObj.dictionaryServerPort = dictionaryServerPort;
- copyObj.preFetch = preFetch;
- copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
- copyObj.sortScope = sortScope;
- copyObj.batchSortSizeInMb = batchSortSizeInMb;
- copyObj.badRecordsLocation = badRecordsLocation;
- return copyObj;
- }
-
- /**
- * @return the partitionId
- */
- public String getPartitionId() {
- return partitionId;
- }
-
- /**
- * @param partitionId the partitionId to set
- */
- public void setPartitionId(String partitionId) {
- this.partitionId = partitionId;
- }
-
- /**
- * @param storePath The storePath to set.
- */
- public void setStorePath(String storePath) {
- this.storePath = storePath;
- }
-
- /**
- * @return Returns the factStoreLocation.
- */
- public String getStorePath() {
- return storePath;
- }
-
- /**
- * isRetentionRequest
- *
- * @return
- */
- public boolean isRetentionRequest() {
- return isRetentionRequest;
- }
-
- /**
- * getLoadMetadataDetails.
- *
- * @return
- */
- public List<LoadMetadataDetails> getLoadMetadataDetails() {
- return loadMetadataDetails;
- }
-
- /**
- * setLoadMetadataDetails.
- *
- * @param loadMetadataDetails
- */
- public void setLoadMetadataDetails(List<LoadMetadataDetails> loadMetadataDetails) {
- this.loadMetadataDetails = loadMetadataDetails;
- }
-
- /**
- * getSegmentUpdateStatusManager
- *
- * @return
- */
- public SegmentUpdateStatusManager getSegmentUpdateStatusManager() {
- return segmentUpdateStatusManager;
- }
-
- /**
- * setSegmentUpdateStatusManager
- *
- * @param segmentUpdateStatusManager
- */
- public void setSegmentUpdateStatusManager(SegmentUpdateStatusManager segmentUpdateStatusManager) {
- this.segmentUpdateStatusManager = segmentUpdateStatusManager;
- }
-
- /**
- * @return
- */
- public String getTaskNo() {
- return taskNo;
- }
-
- /**
- * @param taskNo
- */
- public void setTaskNo(String taskNo) {
- this.taskNo = taskNo;
- }
-
- /**
- * @return
- */
- public long getFactTimeStamp() {
- return factTimeStamp;
- }
-
- /**
- * @param factTimeStamp
- */
- public void setFactTimeStamp(long factTimeStamp) {
- this.factTimeStamp = factTimeStamp;
- }
-
- public String[] getDelimiters() {
- return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 };
- }
-
- /**
- * @return load Id
- */
- public String getSegmentId() {
- return segmentId;
- }
-
- /**
- * @param segmentId
- */
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
- }
-
- /**
- * the method returns the value to be treated as null while data load
- *
- * @return
- */
- public String getSerializationNullFormat() {
- return serializationNullFormat;
- }
-
- /**
- * the method sets the value to be treated as null while data load
- *
- * @param serializationNullFormat
- */
- public void setSerializationNullFormat(String serializationNullFormat) {
- this.serializationNullFormat = serializationNullFormat;
- }
-
- /**
- * returns the string to enable bad record logger
- *
- * @return
- */
- public String getBadRecordsLoggerEnable() {
- return badRecordsLoggerEnable;
- }
-
- /**
- * method sets the string to specify whether to enable or dissable the badrecord logger.
- *
- * @param badRecordsLoggerEnable
- */
- public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
- this.badRecordsLoggerEnable = badRecordsLoggerEnable;
- }
-
- public String getQuoteChar() {
- return quoteChar;
- }
-
- public void setQuoteChar(String quoteChar) {
- this.quoteChar = quoteChar;
- }
-
- public String getCommentChar() {
- return commentChar;
- }
-
- public void setCommentChar(String commentChar) {
- this.commentChar = commentChar;
- }
-
- public String getDateFormat() {
- return dateFormat;
- }
-
- public void setDateFormat(String dateFormat) {
- this.dateFormat = dateFormat;
- }
-
- public String getDefaultTimestampFormat() {
- return defaultTimestampFormat;
- }
-
- public void setDefaultTimestampFormat(String defaultTimestampFormat) {
- this.defaultTimestampFormat = defaultTimestampFormat;
- }
-
- /**
- * @return
- */
- public String getMaxColumns() {
- return maxColumns;
- }
-
- /**
- * @param maxColumns
- */
- public void setMaxColumns(String maxColumns) {
- this.maxColumns = maxColumns;
- }
-
- /**
- * returns option to specify the bad record logger action
- *
- * @return
- */
- public String getBadRecordsAction() {
- return badRecordsAction;
- }
-
- /**
- * set option to specify the bad record logger action
- *
- * @param badRecordsAction
- */
- public void setBadRecordsAction(String badRecordsAction) {
- this.badRecordsAction = badRecordsAction;
- }
-
- public boolean getUseOnePass() {
- return useOnePass;
- }
-
- public void setUseOnePass(boolean useOnePass) {
- this.useOnePass = useOnePass;
- }
-
- public int getDictionaryServerPort() {
- return dictionaryServerPort;
- }
-
- public void setDictionaryServerPort(int dictionaryServerPort) {
- this.dictionaryServerPort = dictionaryServerPort;
- }
-
- public String getDictionaryServerHost() {
- return dictionaryServerHost;
- }
-
- public void setDictionaryServerHost(String dictionaryServerHost) {
- this.dictionaryServerHost = dictionaryServerHost;
- }
-
- public boolean isPreFetch() {
- return preFetch;
- }
-
- public void setPreFetch(boolean preFetch) {
- this.preFetch = preFetch;
- }
-
- public String getDefaultDateFormat() {
- return defaultDateFormat;
- }
-
- public void setDefaultDateFormat(String defaultDateFormat) {
- this.defaultDateFormat = defaultDateFormat;
- }
-
- public String getIsEmptyDataBadRecord() {
- return isEmptyDataBadRecord;
- }
-
- public void setIsEmptyDataBadRecord(String isEmptyDataBadRecord) {
- this.isEmptyDataBadRecord = isEmptyDataBadRecord;
- }
-
- public String getSortScope() {
- return sortScope;
- }
-
- public void setSortScope(String sortScope) {
- this.sortScope = sortScope;
- }
-
- public String getBatchSortSizeInMb() {
- return batchSortSizeInMb;
- }
-
- public void setBatchSortSizeInMb(String batchSortSizeInMb) {
- this.batchSortSizeInMb = batchSortSizeInMb;
- }
-
- public String getGlobalSortPartitions() {
- return globalSortPartitions;
- }
-
- public void setGlobalSortPartitions(String globalSortPartitions) {
- this.globalSortPartitions = globalSortPartitions;
- }
-
- public String getBadRecordsLocation() {
- return badRecordsLocation;
- }
-
- public void setBadRecordsLocation(String badRecordsLocation) {
- this.badRecordsLocation = badRecordsLocation;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
deleted file mode 100644
index 9e0aa02..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-
-/**
- * This base abstract class for data loading.
- * It can do transformation jobs as per the implementation.
- *
- * Life cycle of this class is
- * First initialize() is called to initialize the step
- * then execute() is called to process the step logic and
- * then close() is called to close any resources if any opened in the step.
- */
-public abstract class AbstractDataLoadProcessorStep {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(AbstractDataLoadProcessorStep.class.getName());
-
- protected CarbonDataLoadConfiguration configuration;
-
- protected AbstractDataLoadProcessorStep child;
-
- protected AtomicLong rowCounter;
-
- protected boolean closed;
-
- public AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration configuration,
- AbstractDataLoadProcessorStep child) {
- this.configuration = configuration;
- this.child = child;
- this.rowCounter = new AtomicLong();
- this.closed = false;
- }
-
- /**
- * The output meta for this step. The data returns from this step is as per this meta.
- *
- */
- public abstract DataField[] getOutput();
-
- /**
- * Initialization process for this step.
- *
- * @throws IOException
- */
- public void initialize() throws IOException {
- if (LOGGER.isInfoEnabled()) {
- // This thread prints the rows processed in each step for every 10 seconds.
- new Thread() {
- @Override public void run() {
- while (!closed) {
- try {
- LOGGER.info("Rows processed in step " + getStepName() + " : " + rowCounter.get());
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- //ignore
- LOGGER.error(e.getMessage());
- }
- }
- }
- }.start();
- }
- }
-
- /**
- * Tranform the data as per the implementation.
- *
- * @return Array of Iterator with data. It can be processed parallel if implementation class wants
- * @throws CarbonDataLoadingException
- */
- public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
- Iterator<CarbonRowBatch>[] childIters = child.execute();
- Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length];
- for (int i = 0; i < childIters.length; i++) {
- iterators[i] = getIterator(childIters[i]);
- }
- return iterators;
- }
-
- /**
- * Create the iterator using child iterator.
- *
- * @param childIter
- * @return new iterator with step specific processing.
- */
- protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
- return new CarbonIterator<CarbonRowBatch>() {
- @Override public boolean hasNext() {
- return childIter.hasNext();
- }
-
- @Override public CarbonRowBatch next() {
- return processRowBatch(childIter.next());
- }
- };
- }
-
- /**
- * Process the batch of rows as per the step logic.
- *
- * @param rowBatch
- * @return processed row.
- */
- protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
- CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
- while (rowBatch.hasNext()) {
- newBatch.addRow(processRow(rowBatch.next()));
- }
- return newBatch;
- }
-
- /**
- * Process the row as per the step logic.
- *
- * @param row
- * @return processed row.
- */
- protected abstract CarbonRow processRow(CarbonRow row);
-
- /**
- * Get the step name for logging purpose.
- * @return Step name
- */
- protected abstract String getStepName();
-
-
- /**
- * Close all resources.This method is called after execute() is finished.
- * It will be called in both success and failure cases.
- */
- public void close() {
- if (!closed) {
- closed = true;
- LOGGER.info("Total rows processed in step " + this.getStepName() + ": " + rowCounter.get());
- if (child != null) {
- child.close();
- }
- }
- }
-
-}