You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:26 UTC
[39/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index 9cf8a91..10e6785 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.spark.load.FailureCauses
+import org.apache.carbondata.processing.loading.FailureCauses
/**
* IUD update delete and compaction framework.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index 036ca49..5e9d31f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, Lock
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.spark.load.FailureCauses
+import org.apache.carbondata.processing.loading.FailureCauses
private[sql] case class ProjectForUpdateCommand(
plan: LogicalPlan, tableIdentifier: Seq[String])
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index 1f06aed..e0b891a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
case class AlterTableDropCarbonPartitionCommand(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index 21b974a..e16dfc9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -39,7 +39,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 61589de..5c7d451 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -25,8 +25,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
/**
* Test Case for org.apache.carbondata.integration.spark.util.GlobalDictionaryUtil
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index e7eb422..fd3b2cd 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentif
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
/**
* Utility for global dictionary test cases
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 4746ecf..399665f 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -27,9 +27,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
deleted file mode 100644
index 890534e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.api.dataloader;
-
-public class DataLoadModel {
- /**
- * Schema Info
- */
- private SchemaInfo schemaInfo;
-
- /**
- * table table
- */
- private String tableName;
-
- /**
- * is CSV load
- */
- private boolean isCsvLoad;
-
- private String blocksID;
- /**
- * task id, each spark task has a unique id
- */
- private String taskNo;
- /**
- * new load start time
- */
- private String factTimeStamp;
-
- private String escapeCharacter;
-
- private String quoteCharacter;
-
- private String commentCharacter;
-
- private String rddIteratorKey;
-
- private String dateFormat;
-
- private String maxColumns;
- /**
- * @return Returns the schemaInfo.
- */
- public SchemaInfo getSchemaInfo() {
- return schemaInfo;
- }
-
- /**
- * @param schemaInfo The schemaInfo to set.
- */
- public void setSchemaInfo(SchemaInfo schemaInfo) {
- this.schemaInfo = schemaInfo;
- }
-
- /**
- * @return Returns the tableName.
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * @param tableName The tableName to set.
- */
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- /**
- * @return Returns the isCsvLoad.
- */
- public boolean isCsvLoad() {
- return isCsvLoad;
- }
-
- /**
- * @param isCsvLoad The isCsvLoad to set.
- */
- public void setCsvLoad(boolean isCsvLoad) {
- this.isCsvLoad = isCsvLoad;
- }
-
- /**
- * get block id
- *
- * @return
- */
- public String getBlocksID() {
- return blocksID;
- }
-
- /**
- * set block id to data load model
- *
- * @param blocksID
- */
- public void setBlocksID(String blocksID) {
- this.blocksID = blocksID;
- }
-
- /**
- * @return
- */
- public String getTaskNo() {
- return taskNo;
- }
-
- /**
- * @param taskNo
- */
- public void setTaskNo(String taskNo) {
- this.taskNo = taskNo;
- }
-
- /**
- * @return
- */
- public String getFactTimeStamp() {
- return factTimeStamp;
- }
-
- /**
- * @param factTimeStamp
- */
- public void setFactTimeStamp(String factTimeStamp) {
- this.factTimeStamp = factTimeStamp;
- }
-
- public String getEscapeCharacter() {
- return escapeCharacter;
- }
-
- public void setEscapeCharacter(String escapeCharacter) {
- this.escapeCharacter = escapeCharacter;
- }
-
- public String getQuoteCharacter() { return quoteCharacter; }
-
- public void setQuoteCharacter(String quoteCharacter) { this.quoteCharacter = quoteCharacter; }
-
- public String getCommentCharacter() { return commentCharacter; }
-
- public void setCommentCharacter(String commentCharacter) {
- this.commentCharacter = commentCharacter;
- }
-
- public String getDateFormat() { return dateFormat; }
-
- public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; }
- /**
- * @return
- */
- public String getMaxColumns() {
- return maxColumns;
- }
-
- /**
- * @param maxColumns
- */
- public void setMaxColumns(String maxColumns) {
- this.maxColumns = maxColumns;
- }
-
- public String getRddIteratorKey() {
- return rddIteratorKey;
- }
-
- public void setRddIteratorKey(String rddIteratorKey) {
- this.rddIteratorKey = rddIteratorKey;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
deleted file mode 100644
index 88c4879..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.api.dataloader;
-
-public class SchemaInfo {
-
- /**
- * databaseName
- */
- private String databaseName;
-
- /**
- * tableName
- */
- private String tableName;
-
- /**
- * isAutoAggregateRequest
- */
- private boolean isAutoAggregateRequest;
-
- private String complexDelimiterLevel1;
-
- private String complexDelimiterLevel2;
- /**
- * the value to be treated as null while data load
- */
- private String serializationNullFormat;
-
- /**
- * defines the string to specify whether the bad record logger should be enabled or not
- */
- private String badRecordsLoggerEnable;
- /**
- * defines the option to specify whether to bad record logger action
- */
- private String badRecordsLoggerAction;
-
-
- public String getComplexDelimiterLevel1() {
- return complexDelimiterLevel1;
- }
-
- public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
- this.complexDelimiterLevel1 = complexDelimiterLevel1;
- }
-
- public String getComplexDelimiterLevel2() {
- return complexDelimiterLevel2;
- }
-
- public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
- this.complexDelimiterLevel2 = complexDelimiterLevel2;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- /**
- * @return the isAutoAggregateRequest
- */
- public boolean isAutoAggregateRequest() {
- return isAutoAggregateRequest;
- }
-
- /**
- * @param isAutoAggregateRequest the isAutoAggregateRequest to set
- */
- public void setAutoAggregateRequest(boolean isAutoAggregateRequest) {
- this.isAutoAggregateRequest = isAutoAggregateRequest;
- }
-
- /**
- * @return the databaseName
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * @param databaseName the databaseName to set
- */
- public void setDatabaseName(String databaseName) {
- this.databaseName = databaseName;
- }
-
- /**
- * the method returns the value to be treated as null while data load
- * @return
- */
- public String getSerializationNullFormat() {
- return serializationNullFormat;
- }
-
- /**
- * the method sets the value to be treated as null while data load
- * @param serializationNullFormat
- */
- public void setSerializationNullFormat(String serializationNullFormat) {
- this.serializationNullFormat = serializationNullFormat;
- }
-
- /**
- * returns the string to enable bad record logger
- * @return
- */
- public String getBadRecordsLoggerEnable() {
- return badRecordsLoggerEnable;
- }
-
- /**
- * method sets the string to specify whether to enable or dissable the badrecord logger.
- * @param badRecordsLoggerEnable
- */
- public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
- this.badRecordsLoggerEnable = badRecordsLoggerEnable;
- }
-
- /**
- * returns the option to set bad record logger action
- * @return
- */
- public String getBadRecordsLoggerAction() {
- return badRecordsLoggerAction;
- }
-
- /**
- * set the option to set set bad record logger action
- * @param badRecordsLoggerAction
- */
- public void setBadRecordsLoggerAction(String badRecordsLoggerAction) {
- this.badRecordsLoggerAction = badRecordsLoggerAction;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
deleted file mode 100644
index 05f561f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.constants;
-
-public final class DataProcessorConstants {
- /**
- *
- */
- public static final String CSV_DATALOADER = "CSV_DATALOADER";
- /**
- *
- */
- public static final String DATARESTRUCT = "DATARESTRUCT";
- /**
- * UPDATEMEMBER
- */
- public static final String UPDATEMEMBER = "UPDATEMEMBER";
- /**
- * number of days task should be in DB table
- */
- public static final String TASK_RETENTION_DAYS = "dataload.taskstatus.retention";
- /**
- * LOAD_FOLDER
- */
- public static final String LOAD_FOLDER = "Load_";
- /**
- * if bad record found
- */
- public static final long BAD_REC_FOUND = 223732673;
- /**
- * if bad record found
- */
- public static final long CSV_VALIDATION_ERRROR_CODE = 113732678;
- /**
- * Year Member val for data retention.
- */
- public static final String YEAR = "YEAR";
-
- /**
- * if data load fails due to bad record
- */
- public static final long BAD_REC_FAILURE_ERROR_CODE = 223732674;
-
- private DataProcessorConstants() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java b/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
deleted file mode 100644
index 3917974..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.constants;
-
-/**
- * enum holds the value related to the ddl option
- */
-public enum TableOptionConstant {
- SERIALIZATION_NULL_FORMAT("serialization_null_format"),
- BAD_RECORDS_LOGGER_ENABLE("bad_records_logger_enable"),
- BAD_RECORDS_ACTION("bad_records_action");
-
- private String name;
-
- /**
- * constructor to initialize the enum value
- * @param name
- */
- TableOptionConstant(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
deleted file mode 100644
index d6d214b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.csvload;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/**
- * blocks info
- */
-public class BlockDetails extends FileSplit implements Serializable {
-
- /**
- * serialization version
- */
- private static final long serialVersionUID = 2293906691860002339L;
- //block offset
- private long blockOffset;
- //block length
- private long blockLength;
- //file path which block belong to
- private String filePath;
- // locations where this block exists
- private String[] locations;
-
- public BlockDetails(Path filePath, long blockOffset, long blockLength, String[] locations) {
- super(filePath, blockOffset, blockLength, locations);
- this.filePath = filePath.toString();
- this.blockOffset = blockOffset;
- this.blockLength = blockLength;
- this.locations = locations;
- }
-
- public long getBlockOffset() {
- return blockOffset;
- }
-
- public long getBlockLength() {
- return blockLength;
- }
-
- public String getFilePath() {
- return FileFactory.getUpdatedFilePath(filePath);
- }
-
- public void setFilePath(String filePath) {
- this.filePath = filePath;
- }
-
- public String[] getLocations() {
- return locations;
- }
-
- /** The file containing this split's data. */
- @Override
- public Path getPath() { return new Path(filePath); }
-
- /** The position of the first byte in the file to process. */
- @Override
- public long getStart() { return blockOffset; }
-
- /** The number of bytes in the file to process. */
- @Override
- public long getLength() { return blockLength; }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
deleted file mode 100644
index 9f80c07..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.csvload;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Customarized reader class to read data from file
- * untill the upper threshold reached.
- */
-public class BoundedInputStream extends InputStream {
-
- /**
- * byte value of the new line character
- */
- private static final byte END_OF_LINE_BYTE_VALUE = '\n';
-
- /**
- * number of extra character to read
- */
- private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
-
- /**
- * number of bytes remaining
- */
- private long remaining;
- /**
- * to check whether end of line is found
- */
- private boolean endOfLineFound = false;
-
- private DataInputStream in;
-
- public BoundedInputStream(DataInputStream in, long limit) {
- this.in = in;
- this.remaining = limit;
- }
-
- /**
- * Below method will be used to read the data from file
- *
- * @throws IOException
- * problem while reading
- */
- @Override
- public int read() throws IOException {
- if (this.remaining == 0) {
- return -1;
- } else {
- int var1 = this.in.read();
- if (var1 >= 0) {
- --this.remaining;
- }
-
- return var1;
- }
- }
-
- /**
- * Below method will be used to read the data from file. If limit reaches in
- * that case it will read until new line character is reached
- *
- * @param buffer
- * buffer in which data will be read
- * @param offset
- * from position to buffer will be filled
- * @param length
- * number of character to be read
- * @throws IOException
- * problem while reading
- */
- @Override
- public int read(byte[] buffer, int offset, int length) throws IOException {
- if (this.remaining == 0) {
- return -1;
- } else {
- if (this.remaining < length) {
- length = (int) this.remaining;
- }
-
- length = this.in.read(buffer, offset, length);
- if (length >= 0) {
- this.remaining -= length;
- if (this.remaining == 0 && !endOfLineFound) {
- endOfLineFound = true;
- this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
- } else if (endOfLineFound) {
- int end = offset + length;
- for (int i = offset; i < end; i++) {
- if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
- this.remaining = 0;
- return (i - offset) + 1;
- }
- }
- this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
- }
- }
- return length;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
-
- public long getRemaining() {
- return this.remaining;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
deleted file mode 100644
index c793126..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.csvload;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.nio.charset.Charset;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-import com.univocity.parsers.csv.CsvParser;
-import com.univocity.parsers.csv.CsvParserSettings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.util.LineReader;
-
-/**
- * An {@link org.apache.hadoop.mapreduce.InputFormat} for csv files. Files are broken into lines.
- * Values are the line of csv files.
- */
-public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWritable> {
-
- public static final String DELIMITER = "carbon.csvinputformat.delimiter";
- public static final String DELIMITER_DEFAULT = ",";
- public static final String COMMENT = "carbon.csvinputformat.comment";
- public static final String COMMENT_DEFAULT = "#";
- public static final String QUOTE = "carbon.csvinputformat.quote";
- public static final String QUOTE_DEFAULT = "\"";
- public static final String ESCAPE = "carbon.csvinputformat.escape";
- public static final String ESCAPE_DEFAULT = "\\";
- public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
- public static final boolean HEADER_PRESENT_DEFAULT = false;
- public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
- public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
- public static final String MAX_COLUMNS = "carbon.csvinputformat.max.columns";
- public static final String NUMBER_OF_COLUMNS = "carbon.csvinputformat.number.of.columns";
- public static final int DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 2000;
- public static final int THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 20000;
-
- private static LogService LOGGER =
- LogServiceFactory.getLogService(CSVInputFormat.class.toString());
-
-
- @Override
- public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit,
- TaskAttemptContext context) throws IOException, InterruptedException {
- return new CSVRecordReader();
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration())
- .getCodec(file);
- if (null == codec) {
- return true;
- }
- return codec instanceof SplittableCompressionCodec;
- }
-
- /**
- * Sets the comment char to configuration. Default it is #.
- * @param configuration
- * @param commentChar
- */
- public static void setCommentCharacter(Configuration configuration, String commentChar) {
- if (commentChar != null && !commentChar.isEmpty()) {
- configuration.set(COMMENT, commentChar);
- }
- }
-
- /**
- * Sets the delimiter to configuration. Default it is ','
- * @param configuration
- * @param delimiter
- */
- public static void setCSVDelimiter(Configuration configuration, String delimiter) {
- if (delimiter != null && !delimiter.isEmpty()) {
- configuration.set(DELIMITER, delimiter);
- }
- }
-
- /**
- * Sets the escape character to configuration. Default it is \
- * @param configuration
- * @param escapeCharacter
- */
- public static void setEscapeCharacter(Configuration configuration, String escapeCharacter) {
- if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
- configuration.set(ESCAPE, escapeCharacter);
- }
- }
-
- /**
- * Whether header needs to read from csv or not. By default it is false.
- * @param configuration
- * @param headerExtractEnable
- */
- public static void setHeaderExtractionEnabled(Configuration configuration,
- boolean headerExtractEnable) {
- configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
- }
-
- /**
- * Sets the quote character to configuration. Default it is "
- * @param configuration
- * @param quoteCharacter
- */
- public static void setQuoteCharacter(Configuration configuration, String quoteCharacter) {
- if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
- configuration.set(QUOTE, quoteCharacter);
- }
- }
-
- /**
- * Sets the read buffer size to configuration.
- * @param configuration
- * @param bufferSize
- */
- public static void setReadBufferSize(Configuration configuration, String bufferSize) {
- if (bufferSize != null && !bufferSize.isEmpty()) {
- configuration.set(READ_BUFFER_SIZE, bufferSize);
- }
- }
-
- public static void setMaxColumns(Configuration configuration, String maxColumns) {
- if (maxColumns != null) {
- configuration.set(MAX_COLUMNS, maxColumns);
- }
- }
-
- public static void setNumberOfColumns(Configuration configuration, String numberOfColumns) {
- configuration.set(NUMBER_OF_COLUMNS, numberOfColumns);
- }
-
- /**
- * Treats value as line in file. Key is null.
- */
- public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {
-
- private long start;
- private long end;
- private BoundedInputStream boundedInputStream;
- private Reader reader;
- private CsvParser csvParser;
- private StringArrayWritable value;
- private String[] columns;
- private Seekable filePosition;
- private boolean isCompressedInput;
- private Decompressor decompressor;
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- FileSplit split = (FileSplit) inputSplit;
- start = split.getStart();
- end = start + split.getLength();
- Path file = split.getPath();
- Configuration job = context.getConfiguration();
- CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
- FileSystem fs = file.getFileSystem(job);
- int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT));
- FSDataInputStream fileIn = fs.open(file, bufferSize);
- InputStream inputStream;
- if (codec != null) {
- isCompressedInput = true;
- decompressor = CodecPool.getDecompressor(codec);
- if (codec instanceof SplittableCompressionCodec) {
- SplitCompressionInputStream scIn = ((SplittableCompressionCodec) codec)
- .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec
- .READ_MODE.BYBLOCK);
- start = scIn.getAdjustedStart();
- end = scIn.getAdjustedEnd();
- if (start != 0) {
- LineReader lineReader = new LineReader(scIn, 1);
- start += lineReader.readLine(new Text(), 0);
- }
- filePosition = scIn;
- inputStream = scIn;
- } else {
- CompressionInputStream cIn = codec.createInputStream(fileIn, decompressor);
- filePosition = cIn;
- inputStream = cIn;
- }
- } else {
- fileIn.seek(start);
- if (start != 0) {
- LineReader lineReader = new LineReader(fileIn, 1);
- start += lineReader.readLine(new Text(), 0);
- }
- boundedInputStream = new BoundedInputStream(fileIn, end - start);
- filePosition = fileIn;
- inputStream = boundedInputStream;
- }
- reader = new InputStreamReader(inputStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
- csvParser = new CsvParser(extractCsvParserSettings(job));
- csvParser.beginParsing(reader);
- }
-
- private CsvParserSettings extractCsvParserSettings(Configuration job) {
- CsvParserSettings parserSettings = new CsvParserSettings();
- parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
- parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
- parserSettings.setLineSeparatorDetectionEnabled(true);
- parserSettings.setNullValue("");
- parserSettings.setEmptyValue("");
- parserSettings.setIgnoreLeadingWhitespaces(false);
- parserSettings.setIgnoreTrailingWhitespaces(false);
- parserSettings.setSkipEmptyLines(false);
- parserSettings.setMaxCharsPerColumn(100000);
- String maxColumns = job.get(MAX_COLUMNS);
- parserSettings.setMaxColumns(Integer.parseInt(maxColumns));
- parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
- parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
- if (start == 0) {
- parserSettings.setHeaderExtractionEnabled(job.getBoolean(HEADER_PRESENT,
- HEADER_PRESENT_DEFAULT));
- }
- return parserSettings;
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (csvParser == null) {
- return false;
- }
- columns = csvParser.parseNext();
- if (columns == null) {
- value = null;
- return false;
- }
- if (value == null) {
- value = new StringArrayWritable();
- }
- value.set(columns);
- return true;
- }
-
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return NullWritable.get();
- }
-
- @Override
- public StringArrayWritable getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- private long getPos() throws IOException {
- long retVal = start;
- if (null != boundedInputStream) {
- retVal = end - boundedInputStream.getRemaining();
- } else if (isCompressedInput && null != filePosition) {
- retVal = filePosition.getPos();
- }
- return retVal;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return start == end ? 0.0F : Math.min(1.0F, (float) (getPos() -
- start) / (float) (end - start));
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (reader != null) {
- reader.close();
- }
- if (boundedInputStream != null) {
- boundedInputStream.close();
- }
- if (null != csvParser) {
- csvParser.stopParsing();
- }
- } finally {
- reader = null;
- boundedInputStream = null;
- csvParser = null;
- filePosition = null;
- value = null;
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
deleted file mode 100644
index efe75ef..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.csvload;
-
-import java.io.IOException;
-
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import com.univocity.parsers.common.TextParsingException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * It is wrapper iterator around @{@link RecordReader}.
- */
-public class CSVRecordReaderIterator extends CarbonIterator<Object []> {
-
- private RecordReader<NullWritable, StringArrayWritable> recordReader;
-
- /**
- * It is just a little hack to make recordreader as iterator. Usually we cannot call hasNext
- * multiple times on record reader as it moves another line. To avoid that situation like hasNext
- * only tells whether next row is present or not and next will move the pointer to next row after
- * consuming it.
- */
- private boolean isConsumed;
-
- private InputSplit split;
-
- private TaskAttemptContext context;
-
- public CSVRecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader,
- InputSplit split, TaskAttemptContext context) {
- this.recordReader = recordReader;
- this.split = split;
- this.context = context;
- }
-
- @Override
- public boolean hasNext() {
- try {
- if (!isConsumed) {
- isConsumed = recordReader.nextKeyValue();
- return isConsumed;
- }
- return true;
- } catch (Exception e) {
- if (e instanceof TextParsingException) {
- throw new CarbonDataLoadingException(
- CarbonDataProcessorUtil.trimErrorMessage(e.getMessage()));
- }
- throw new CarbonDataLoadingException(e);
- }
- }
-
- @Override
- public Object[] next() {
- try {
- String[] data = recordReader.getCurrentValue().get();
- isConsumed = false;
- return data;
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- @Override
- public void initialize() {
- try {
- recordReader.initialize(split, context);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {
- try {
- recordReader.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
deleted file mode 100644
index 7eb3ec9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.csvload;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * A String sequence that is usable as a key or value.
- */
-public class StringArrayWritable implements Writable {
- private String[] values;
-
- public String[] toStrings() {
- return values;
- }
-
- public void set(String[] values) {
- this.values = values;
- }
-
- public String[] get() {
- return values;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int length = in.readInt();
- values = new String[length];
- for (int i = 0; i < length; i++) {
- byte[] b = new byte[in.readInt()];
- in.readFully(b);
- values[i] = new String(b, Charset.defaultCharset());
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(values.length); // write values
- for (int i = 0; i < values.length; i++) {
- byte[] b = values[i].getBytes(Charset.defaultCharset());
- out.writeInt(b.length);
- out.write(b);
- }
- }
-
- @Override
- public String toString() {
- return Arrays.toString(values);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/manager/CarbonDataProcessorManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/manager/CarbonDataProcessorManager.java b/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/manager/CarbonDataProcessorManager.java
deleted file mode 100644
index 5cf2078..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/manager/CarbonDataProcessorManager.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.dataprocessor.manager;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public final class CarbonDataProcessorManager {
- /**
- * instance
- */
- private static final CarbonDataProcessorManager INSTANCE = new CarbonDataProcessorManager();
-
- /**
- * managerHandlerMap
- */
- private Map<String, Object> managerHandlerMap;
-
- /**
- * private constructor
- */
- private CarbonDataProcessorManager() {
- managerHandlerMap = new HashMap<String, Object>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- }
-
- /**
- * Get instance method will be used to get the class instance
- *
- * @return
- */
- public static CarbonDataProcessorManager getInstance() {
- return INSTANCE;
- }
-
- /**
- * Below method will be used to get the lock object for all the data processing request.
- * form the local map, if empty than it will update the map and return the lock object
- *
- * @param key
- * @return
- */
- public synchronized Object getDataProcessingLockObject(String key) {
- Object object = managerHandlerMap.get(key);
- if (null == object) {
- object = new Object();
- managerHandlerMap.put(key, object);
- }
- return object;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 7661577..86a6744 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -26,7 +26,7 @@ import java.util.List;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.processing.newflow.complexobjects.ArrayObject;
+import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index a9c2bfe..95d7d2e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -44,9 +44,9 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary;
-import org.apache.carbondata.processing.newflow.dictionary.DirectDictionary;
-import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
+import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
+import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
+import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary;
/**
* Primitive DataType stateless object used in data loading
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 68b6911..e90fd4a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -26,7 +26,7 @@ import java.util.List;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.processing.newflow.complexobjects.StructObject;
+import org.apache.carbondata.processing.loading.complexobjects.StructObject;
/**
* Struct DataType stateless object used in data loading
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/etl/DataLoadingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/etl/DataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/etl/DataLoadingException.java
deleted file mode 100644
index 4cb5961..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/etl/DataLoadingException.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.etl;
-
-public class DataLoadingException extends Exception {
- private static final long serialVersionUID = 1L;
-
- private long errorCode = -1;
-
- public DataLoadingException() {
- super();
- }
-
- public DataLoadingException(long errorCode, String message) {
- super(message);
- this.errorCode = errorCode;
- }
-
- public DataLoadingException(String message) {
- super(message);
- }
-
- public DataLoadingException(Throwable cause) {
- super(cause);
- }
-
- public DataLoadingException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public long getErrorCode() {
- return errorCode;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
new file mode 100644
index 0000000..15ff95e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.exception;
+
+public class DataLoadingException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ private long errorCode = -1;
+
+ public DataLoadingException() {
+ super();
+ }
+
+ public DataLoadingException(long errorCode, String message) {
+ super(message);
+ this.errorCode = errorCode;
+ }
+
+ public DataLoadingException(String message) {
+ super(message);
+ }
+
+ public DataLoadingException(Throwable cause) {
+ super(cause);
+ }
+
+ public DataLoadingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public long getErrorCode() {
+ return errorCode;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java b/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
new file mode 100644
index 0000000..d9640a9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.exception;
+
+import java.util.Locale;
+
+public class SliceMergerException extends Exception {
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public SliceMergerException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public SliceMergerException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * This method is used to get the localized message.
+ *
+ * @param locale - A Locale object represents a specific geographical,
+ * political, or cultural region.
+ * @return - Localized error message.
+ */
+ public String getLocalizedMessage(Locale locale) {
+ return "";
+ }
+
+ /**
+ * getLocalizedMessage
+ */
+ @Override public String getLocalizedMessage() {
+ return super.getLocalizedMessage();
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
new file mode 100644
index 0000000..9f2482b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+
+/**
+ * This base abstract class for data loading.
+ * It can do transformation jobs as per the implementation.
+ *
+ * Life cycle of this class is
+ * First initialize() is called to initialize the step
+ * then execute() is called to process the step logic and
+ * then close() is called to close any resources if any opened in the step.
+ */
+public abstract class AbstractDataLoadProcessorStep {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractDataLoadProcessorStep.class.getName());
+
+ protected CarbonDataLoadConfiguration configuration;
+
+ protected AbstractDataLoadProcessorStep child;
+
+ protected AtomicLong rowCounter;
+
+ protected boolean closed;
+
+ public AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration configuration,
+ AbstractDataLoadProcessorStep child) {
+ this.configuration = configuration;
+ this.child = child;
+ this.rowCounter = new AtomicLong();
+ this.closed = false;
+ }
+
+ /**
+ * The output meta for this step. The data returns from this step is as per this meta.
+ *
+ */
+ public abstract DataField[] getOutput();
+
+ /**
+ * Initialization process for this step.
+ *
+ * @throws IOException
+ */
+ public void initialize() throws IOException {
+ if (LOGGER.isInfoEnabled()) {
+ // This thread prints the rows processed in each step for every 10 seconds.
+ new Thread() {
+ @Override public void run() {
+ while (!closed) {
+ try {
+ LOGGER.info("Rows processed in step " + getStepName() + " : " + rowCounter.get());
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ //ignore
+ LOGGER.error(e.getMessage());
+ }
+ }
+ }
+ }.start();
+ }
+ }
+
+ /**
+ * Tranform the data as per the implementation.
+ *
+ * @return Array of Iterator with data. It can be processed parallel if implementation class wants
+ * @throws CarbonDataLoadingException
+ */
+ public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+ Iterator<CarbonRowBatch>[] childIters = child.execute();
+ Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length];
+ for (int i = 0; i < childIters.length; i++) {
+ iterators[i] = getIterator(childIters[i]);
+ }
+ return iterators;
+ }
+
+ /**
+ * Create the iterator using child iterator.
+ *
+ * @param childIter
+ * @return new iterator with step specific processing.
+ */
+ protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+ return new CarbonIterator<CarbonRowBatch>() {
+ @Override public boolean hasNext() {
+ return childIter.hasNext();
+ }
+
+ @Override public CarbonRowBatch next() {
+ return processRowBatch(childIter.next());
+ }
+ };
+ }
+
+ /**
+ * Process the batch of rows as per the step logic.
+ *
+ * @param rowBatch
+ * @return processed row.
+ */
+ protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
+ CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+ while (rowBatch.hasNext()) {
+ newBatch.addRow(processRow(rowBatch.next()));
+ }
+ return newBatch;
+ }
+
+ /**
+ * Process the row as per the step logic.
+ *
+ * @param row
+ * @return processed row.
+ */
+ protected abstract CarbonRow processRow(CarbonRow row);
+
+ /**
+ * Get the step name for logging purpose.
+ * @return Step name
+ */
+ protected abstract String getStepName();
+
+
+ /**
+ * Close all resources.This method is called after execute() is finished.
+ * It will be called in both success and failure cases.
+ */
+ public void close() {
+ if (!closed) {
+ closed = true;
+ LOGGER.info("Total rows processed in step " + this.getStepName() + ": " + rowCounter.get());
+ if (child != null) {
+ child.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
new file mode 100644
index 0000000..bc0ce3a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+public class BadRecordsLogger {
+
+ /**
+ * Comment for <code>LOGGER</code>
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BadRecordsLogger.class.getName());
+ /**
+ * Which holds the key and if any bad rec found to check from API to update
+ * the status
+ */
+ private static Map<String, String> badRecordEntry =
+ new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ /**
+ * File Name
+ */
+ private String fileName;
+ /**
+ * Store path
+ */
+ private String storePath;
+ /**
+ * FileChannel
+ */
+ private BufferedWriter bufferedWriter;
+ private DataOutputStream outStream;
+ /**
+ * csv file writer
+ */
+ private BufferedWriter bufferedCSVWriter;
+ private DataOutputStream outCSVStream;
+ /**
+ * bad record log file path
+ */
+ private String logFilePath;
+ /**
+ * csv file path
+ */
+ private String csvFilePath;
+
+ /**
+ * task key which is DatabaseName/TableName/tablename
+ */
+ private String taskKey;
+
+ private boolean badRecordsLogRedirect;
+
+ private boolean badRecordLoggerEnable;
+
+ private boolean badRecordConvertNullDisable;
+
+ private boolean isDataLoadFail;
+
+ // private final Object syncObject =new Object();
+
+ public BadRecordsLogger(String key, String fileName, String storePath,
+ boolean badRecordsLogRedirect, boolean badRecordLoggerEnable,
+ boolean badRecordConvertNullDisable, boolean isDataLoadFail) {
+ // Initially no bad rec
+ taskKey = key;
+ this.fileName = fileName;
+ this.storePath = storePath;
+ this.badRecordsLogRedirect = badRecordsLogRedirect;
+ this.badRecordLoggerEnable = badRecordLoggerEnable;
+ this.badRecordConvertNullDisable = badRecordConvertNullDisable;
+ this.isDataLoadFail = isDataLoadFail;
+ }
+
+ /**
+ * @param key DatabaseNaame/TableName/tablename
+ * @return return "Partially"
+ */
+ public static String hasBadRecord(String key) {
+ return badRecordEntry.get(key);
+ }
+
+ /**
+ * @param key DatabaseNaame/TableName/tablename
+ * @return remove key from the map
+ */
+ public static String removeBadRecordKey(String key) {
+ return badRecordEntry.remove(key);
+ }
+
+ public void addBadRecordsToBuilder(Object[] row, String reason)
+ throws CarbonDataLoadingException {
+ if (badRecordsLogRedirect || badRecordLoggerEnable) {
+ StringBuilder logStrings = new StringBuilder();
+ int size = row.length;
+ int count = size;
+ for (int i = 0; i < size; i++) {
+ if (null == row[i]) {
+ char ch =
+ logStrings.length() > 0 ? logStrings.charAt(logStrings.length() - 1) : (char) -1;
+ if (ch == ',') {
+ logStrings = logStrings.deleteCharAt(logStrings.lastIndexOf(","));
+ }
+ break;
+ } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(row[i].toString())) {
+ logStrings.append("null");
+ } else {
+ logStrings.append(row[i]);
+ }
+ if (count > 1) {
+ logStrings.append(',');
+ }
+ count--;
+ }
+ if (badRecordsLogRedirect) {
+ writeBadRecordsToCSVFile(logStrings);
+ }
+ if (badRecordLoggerEnable) {
+ logStrings.append("----->");
+ if (null != reason) {
+ if (reason.indexOf(CarbonCommonConstants.MEMBER_DEFAULT_VAL) > -1) {
+ logStrings
+ .append(reason.replace(CarbonCommonConstants.MEMBER_DEFAULT_VAL, "null"));
+ } else {
+ logStrings.append(reason);
+ }
+ }
+ writeBadRecordsToFile(logStrings);
+ }
+ } else {
+ // setting partial success entry since even if bad records are there then load
+ // status should be partial success regardless of bad record logged
+ badRecordEntry.put(taskKey, "Partially");
+ }
+ }
+
+ /**
+ *
+ */
+ private synchronized void writeBadRecordsToFile(StringBuilder logStrings)
+ throws CarbonDataLoadingException {
+ if (null == logFilePath) {
+ logFilePath =
+ this.storePath + File.separator + this.fileName + CarbonCommonConstants.LOG_FILE_EXTENSION
+ + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+ }
+ try {
+ if (null == bufferedWriter) {
+ FileType fileType = FileFactory.getFileType(storePath);
+ if (!FileFactory.isFileExist(this.storePath, fileType)) {
+ // create the folders if not exist
+ FileFactory.mkdirs(this.storePath, fileType);
+
+ // create the files
+ FileFactory.createNewFile(logFilePath, fileType);
+ }
+
+ outStream = FileFactory.getDataOutputStream(logFilePath, fileType);
+
+ bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ }
+ bufferedWriter.write(logStrings.toString());
+ bufferedWriter.newLine();
+ } catch (FileNotFoundException e) {
+ LOGGER.error("Bad Log Files not found");
+ throw new CarbonDataLoadingException("Bad Log Files not found", e);
+ } catch (IOException e) {
+ LOGGER.error("Error While writing bad record log File");
+ throw new CarbonDataLoadingException("Error While writing bad record log File", e);
+ } finally {
+ // if the Bad record file is created means it partially success
+ // if any entry present with key that means its have bad record for
+ // that key
+ badRecordEntry.put(taskKey, "Partially");
+ }
+ }
+
+ /**
+ * method will write the row having bad record in the csv file.
+ *
+ * @param logStrings
+ */
+ private synchronized void writeBadRecordsToCSVFile(StringBuilder logStrings)
+ throws CarbonDataLoadingException {
+ if (null == csvFilePath) {
+ csvFilePath =
+ this.storePath + File.separator + this.fileName + CarbonCommonConstants.CSV_FILE_EXTENSION
+ + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+ }
+ try {
+ if (null == bufferedCSVWriter) {
+ FileType fileType = FileFactory.getFileType(storePath);
+ if (!FileFactory.isFileExist(this.storePath, fileType)) {
+ // create the folders if not exist
+ FileFactory.mkdirs(this.storePath, fileType);
+
+ // create the files
+ FileFactory.createNewFile(csvFilePath, fileType);
+ }
+
+ outCSVStream = FileFactory.getDataOutputStream(csvFilePath, fileType);
+
+ bufferedCSVWriter = new BufferedWriter(new OutputStreamWriter(outCSVStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ }
+ bufferedCSVWriter.write(logStrings.toString());
+ bufferedCSVWriter.newLine();
+ } catch (FileNotFoundException e) {
+ LOGGER.error("Bad record csv Files not found");
+ throw new CarbonDataLoadingException("Bad record csv Files not found", e);
+ } catch (IOException e) {
+ LOGGER.error("Error While writing bad record csv File");
+ throw new CarbonDataLoadingException("Error While writing bad record csv File", e);
+ }
+ finally {
+ badRecordEntry.put(taskKey, "Partially");
+ }
+ }
+
+ public boolean isBadRecordConvertNullDisable() {
+ return badRecordConvertNullDisable;
+ }
+
+ public boolean isDataLoadFail() {
+ return isDataLoadFail;
+ }
+
+ public boolean isBadRecordLoggerEnable() {
+ return badRecordLoggerEnable;
+ }
+
+ public boolean isBadRecordsLogRedirect() {
+ return badRecordsLogRedirect;
+ }
+
+ /**
+ * closeStreams void
+ */
+ public synchronized void closeStreams() {
+ CarbonUtil.closeStreams(bufferedWriter, outStream, bufferedCSVWriter, outCSVStream);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
new file mode 100644
index 0000000..7309c91
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.processing.loading.converter.DictionaryCardinalityFinder;
+
+public class CarbonDataLoadConfiguration {
+
+ private DataField[] dataFields;
+
+ private AbsoluteTableIdentifier tableIdentifier;
+
+ private String[] header;
+
+ private String partitionId;
+
+ private String segmentId;
+
+ private String taskNo;
+
+ private BucketingInfo bucketingInfo;
+
+ private Map<String, Object> dataLoadProperties = new HashMap<>();
+
+ /**
+ * Use one pass to generate dictionary
+ */
+ private boolean useOnePass;
+
+ /**
+ * dictionary server host
+ */
+ private String dictionaryServerHost;
+
+ /**
+ * dictionary sever port
+ */
+ private int dictionaryServerPort;
+
+ private boolean preFetch;
+
+ private int dimensionCount;
+
+ private int measureCount;
+
+ private int noDictionaryCount;
+
+ private int complexColumnCount;
+
+ /**
+ * schema updated time stamp to be used for restructure scenarios
+ */
+ private long schemaUpdatedTimeStamp;
+
+ private DictionaryCardinalityFinder cardinalityFinder;
+
+ private int numberOfSortColumns;
+
+ private int numberOfNoDictSortColumns;
+
+ // contains metadata used in write step of loading process
+ private TableSpec tableSpec;
+
+ public CarbonDataLoadConfiguration() {
+ }
+
+ public void setDataFields(DataField[] dataFields) {
+ this.dataFields = dataFields;
+
+ // set counts for each column category
+ for (DataField dataField : dataFields) {
+ CarbonColumn column = dataField.getColumn();
+ if (column.isDimension()) {
+ dimensionCount++;
+ if (!dataField.hasDictionaryEncoding()) {
+ noDictionaryCount++;
+ }
+ }
+ if (column.isComplex()) {
+ complexColumnCount++;
+ }
+ if (column.isMeasure()) {
+ measureCount++;
+ }
+ }
+ }
+
+ public DataField[] getDataFields() {
+ return dataFields;
+ }
+
+ public int getDimensionCount() {
+ return dimensionCount;
+ }
+
+ public int getNoDictionaryCount() {
+ return noDictionaryCount;
+ }
+
+ public int getComplexColumnCount() {
+ return complexColumnCount;
+ }
+
+ public int getMeasureCount() {
+ return measureCount;
+ }
+
+ public void setNumberOfSortColumns(int numberOfSortColumns) {
+ this.numberOfSortColumns = numberOfSortColumns;
+ }
+
+ public int getNumberOfSortColumns() {
+ return this.numberOfSortColumns;
+ }
+
+ public boolean isSortTable() {
+ return this.numberOfSortColumns > 0;
+ }
+
+ public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
+ this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+ }
+
+ public int getNumberOfNoDictSortColumns() {
+ return this.numberOfNoDictSortColumns;
+ }
+
+ public String[] getHeader() {
+ return header;
+ }
+
+ public void setHeader(String[] header) {
+ this.header = header;
+ }
+
+ public AbsoluteTableIdentifier getTableIdentifier() {
+ return tableIdentifier;
+ }
+
+ public void setTableIdentifier(AbsoluteTableIdentifier tableIdentifier) {
+ this.tableIdentifier = tableIdentifier;
+ }
+
+ public String getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(String partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ }
+
+ public String getTaskNo() {
+ return taskNo;
+ }
+
+ public void setTaskNo(String taskNo) {
+ this.taskNo = taskNo;
+ }
+
+ public void setDataLoadProperty(String key, Object value) {
+ dataLoadProperties.put(key, value);
+ }
+
+ public Object getDataLoadProperty(String key) {
+ return dataLoadProperties.get(key);
+ }
+
+ public BucketingInfo getBucketingInfo() {
+ return bucketingInfo;
+ }
+
+ public void setBucketingInfo(BucketingInfo bucketingInfo) {
+ this.bucketingInfo = bucketingInfo;
+ }
+
+ public boolean getUseOnePass() {
+ return useOnePass;
+ }
+
+ public void setUseOnePass(boolean useOnePass) {
+ this.useOnePass = useOnePass;
+ }
+
+ public String getDictionaryServerHost() {
+ return dictionaryServerHost;
+ }
+
+ public void setDictionaryServerHost(String dictionaryServerHost) {
+ this.dictionaryServerHost = dictionaryServerHost;
+ }
+
+ public int getDictionaryServerPort() {
+ return dictionaryServerPort;
+ }
+
+ public void setDictionaryServerPort(int dictionaryServerPort) {
+ this.dictionaryServerPort = dictionaryServerPort;
+ }
+
+ public boolean isPreFetch() {
+ return preFetch;
+ }
+
+ public void setPreFetch(boolean preFetch) {
+ this.preFetch = preFetch;
+ }
+
+ public long getSchemaUpdatedTimeStamp() {
+ return schemaUpdatedTimeStamp;
+ }
+
+ public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
+ this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
+ }
+
+ public DictionaryCardinalityFinder getCardinalityFinder() {
+ return cardinalityFinder;
+ }
+
+ public void setCardinalityFinder(DictionaryCardinalityFinder cardinalityFinder) {
+ this.cardinalityFinder = cardinalityFinder;
+ }
+
+ public DataType[] getMeasureDataType() {
+ List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
+ int measureCount = 0;
+ for (int i = 0; i < dataFields.length; i++) {
+ if (!dataFields[i].getColumn().isDimension()) {
+ measureIndexes.add(i);
+ measureCount++;
+ }
+ }
+
+ DataType[] type = new DataType[measureCount];
+ for (int i = 0; i < type.length; i++) {
+ type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
+ }
+ return type;
+ }
+
+ public int[] calcDimensionLengths() {
+ int[] dimLensWithComplex = getCardinalityFinder().getCardinality();
+ if (!isSortTable()) {
+ for (int i = 0; i < dimLensWithComplex.length; i++) {
+ if (dimLensWithComplex[i] != 0) {
+ dimLensWithComplex[i] = Integer.MAX_VALUE;
+ }
+ }
+ }
+ List<Integer> dimsLenList = new ArrayList<Integer>();
+ for (int eachDimLen : dimLensWithComplex) {
+ if (eachDimLen != 0) dimsLenList.add(eachDimLen);
+ }
+ int[] dimLens = new int[dimsLenList.size()];
+ for (int i = 0; i < dimsLenList.size(); i++) {
+ dimLens[i] = dimsLenList.get(i);
+ }
+ return dimLens;
+ }
+
+ public KeyGenerator[] createKeyGeneratorForComplexDimension() {
+ int[] dimLens = calcDimensionLengths();
+ KeyGenerator[] complexKeyGenerators = new KeyGenerator[dimLens.length];
+ for (int i = 0; i < dimLens.length; i++) {
+ complexKeyGenerators[i] =
+ KeyGeneratorFactory.getKeyGenerator(new int[] { dimLens[i] });
+ }
+ return complexKeyGenerators;
+ }
+
+ public TableSpec getTableSpec() {
+ return tableSpec;
+ }
+
+ public void setTableSpec(TableSpec tableSpec) {
+ this.tableSpec = tableSpec;
+ }
+}