You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/09/21 16:25:19 UTC
[1/3] incubator-carbondata git commit: Corrected file headers and
removed unnecessary open csv files
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 676bd96bd -> 64ddf9fd2
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java
deleted file mode 100644
index 195374a..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelper.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.reader;
-/**
- * Copyright 2005 Bytecode Pty Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-/**
- * Interface for the ResultSetHelperService. Allows the user to define their own ResultSetHelper
- * for use in the CSVWriter.
- */
-public interface ResultSetHelper {
- /**
- * Returns the column Names from the ResultSet.
- *
- * @param rs - ResultSet
- * @return - string array containing the column names.
- * @throws SQLException - thrown by the ResultSet.
- */
- String[] getColumnNames(ResultSet rs) throws SQLException;
-
- /**
- * Returns the column values from the result set.
- *
- * @param rs - the ResultSet containing the values.
- * @return String Array containing the values.
- * @throws SQLException - thrown by the ResultSet.
- * @throws IOException - thrown by the ResultSet.
- */
- String[] getColumnValues(ResultSet rs) throws SQLException, IOException;
-
- /**
- * Returns the column values from the result set with the values trimmed if desired.
- *
- * @param rs - the ResultSet containing the values.
- * @param trim - values should have white spaces trimmed.
- * @return String Array containing the values.
- * @throws SQLException - thrown by the ResultSet.
- * @throws IOException - thrown by the ResultSet.
- */
- String[] getColumnValues(ResultSet rs, boolean trim) throws SQLException, IOException;
-
- /**
- * Returns the column values from the result set with the values trimmed if desired.
- * Also format the date and time columns based on the format strings passed in.
- *
- * @param rs - the ResultSet containing the values.
- * @param trim - values should have white spaces trimmed.
- * @param dateFormatString - format String for dates.
- * @param timeFormatString - format String for timestamps.
- * @return String Array containing the values.
- * @throws SQLException - thrown by the ResultSet.
- * @throws IOException - thrown by the ResultSet.
- */
- String[] getColumnValues(ResultSet rs, boolean trim, String dateFormatString,
- String timeFormatString) throws SQLException, IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java
deleted file mode 100644
index 3d15949..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/ResultSetHelperService.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.reader;
-/**
- * Copyright 2005 Bytecode Pty Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.io.Reader;
-import java.math.BigDecimal;
-import java.sql.Clob;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-/**
- * helper class for processing JDBC ResultSet objects.
- */
-public class ResultSetHelperService implements ResultSetHelper {
- public static final int CLOBBUFFERSIZE = 2048;
-
- // note: we want to maintain compatibility with Java 5 VM's
- // These types don't exist in Java 5
- static final int NVARCHAR = -9;
- static final int NCHAR = -15;
- static final int LONGNVARCHAR = -16;
- static final int NCLOB = 2011;
-
- static final String DEFAULT_DATE_FORMAT = "dd-MMM-yyyy";
- static final String DEFAULT_TIMESTAMP_FORMAT = "dd-MMM-yyyy HH:mm:ss";
-
- /**
- * Default Constructor.
- */
- public ResultSetHelperService() {
- }
-
- private static String read(Clob c) throws SQLException, IOException {
- StringBuilder sb = new StringBuilder((int) c.length());
- Reader r = c.getCharacterStream();
- try {
- char[] cbuf = new char[CLOBBUFFERSIZE];
- int n;
- while ((n = r.read(cbuf, 0, cbuf.length)) != -1) {
- sb.append(cbuf, 0, n);
- }
- } finally {
- r.close();
- }
- return sb.toString();
-
- }
-
- /**
- * Returns the column names from the result set.
- *
- * @param rs - ResultSet
- * @return - a string array containing the column names.
- * @throws SQLException - thrown by the result set.
- */
- public String[] getColumnNames(ResultSet rs) throws SQLException {
- List<String> names = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- //CHECKSTYLE:OFF Approval No:Approval-V1R2C10_010
- ResultSetMetaData metadata = rs.getMetaData();
- //CHECKSTYLE:ON
- for (int i = 0; i < metadata.getColumnCount(); i++) {
- names.add(metadata.getColumnName(i + 1));
- }
-
- String[] nameArray = new String[names.size()];
- return names.toArray(nameArray);
- }
-
- /**
- * Get all the column values from the result set.
- *
- * @param rs - the ResultSet containing the values.
- * @return - String array containing all the column values.
- * @throws SQLException - thrown by the result set.
- * @throws IOException - thrown by the result set.
- */
- public String[] getColumnValues(ResultSet rs) throws SQLException, IOException {
- return this.getColumnValues(rs, false, DEFAULT_DATE_FORMAT, DEFAULT_TIMESTAMP_FORMAT);
- }
-
- /**
- * Get all the column values from the result set.
- *
- * @param rs - the ResultSet containing the values.
- * @param trim - values should have white spaces trimmed.
- * @return - String array containing all the column values.
- * @throws SQLException - thrown by the result set.
- * @throws IOException - thrown by the result set.
- */
- public String[] getColumnValues(ResultSet rs, boolean trim) throws SQLException, IOException {
- return this.getColumnValues(rs, trim, DEFAULT_DATE_FORMAT, DEFAULT_TIMESTAMP_FORMAT);
- }
-
- /**
- * Get all the column values from the result set.
- *
- * @param rs - the ResultSet containing the values.
- * @param trim - values should have white spaces trimmed.
- * @param dateFormatString - format String for dates.
- * @param timeFormatString - format String for timestamps.
- * @return - String array containing all the column values.
- * @throws SQLException - thrown by the result set.
- * @throws IOException - thrown by the result set.
- */
- public String[] getColumnValues(ResultSet rs, boolean trim, String dateFormatString,
- String timeFormatString) throws SQLException, IOException {
- List<String> values = new ArrayList<>();
- ResultSetMetaData metadata = rs.getMetaData();
-
- for (int i = 0; i < metadata.getColumnCount(); i++) {
- values.add(getColumnValue(rs, metadata.getColumnType(i + 1), i + 1, trim, dateFormatString,
- timeFormatString));
- }
-
- String[] valueArray = new String[values.size()];
- return values.toArray(valueArray);
- }
-
- /**
- * changes an object to a String.
- *
- * @param obj - Object to format.
- * @return - String value of an object or empty string if the object is null.
- */
- protected String handleObject(Object obj) {
- return obj == null ? "" : String.valueOf(obj);
- }
-
- /**
- * changes a BigDecimal to String.
- *
- * @param decimal - BigDecimal to format
- * @return String representation of a BigDecimal or empty string if null
- */
- protected String handleBigDecimal(BigDecimal decimal) {
- return decimal == null ? "" : decimal.toString();
- }
-
- /**
- * Retrieves the string representation of an Long value from the result set.
- *
- * @param rs - Result set containing the data.
- * @param columnIndex - index to the column of the long.
- * @return - the string representation of the long
- * @throws SQLException - thrown by the result set on error.
- */
- protected String handleLong(ResultSet rs, int columnIndex) throws SQLException {
- long lv = rs.getLong(columnIndex);
- return rs.wasNull() ? "" : Long.toString(lv);
- }
-
- /**
- * Retrieves the string representation of an Integer value from the result set.
- *
- * @param rs - Result set containing the data.
- * @param columnIndex - index to the column of the integer.
- * @return - string representation of the Integer.
- * @throws SQLException - returned from the result set on error.
- */
- protected String handleInteger(ResultSet rs, int columnIndex) throws SQLException {
- int i = rs.getInt(columnIndex);
- return rs.wasNull() ? "" : Integer.toString(i);
- }
-
- /**
- * Retrieves a date from the result set.
- *
- * @param rs - Result set containing the data
- * @param columnIndex - index to the column of the date
- * @param dateFormatString - format for the date
- * @return - formatted date.
- * @throws SQLException - returned from the result set on error.
- */
- protected String handleDate(ResultSet rs, int columnIndex, String dateFormatString)
- throws SQLException {
- java.sql.Date date = rs.getDate(columnIndex);
- String value = null;
- if (date != null) {
- SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatString);
- value = dateFormat.format(date);
- }
- return value;
- }
-
- /**
- * Return time read from ResultSet.
- *
- * @param time time read from ResultSet
- * @return String version of time or null if time is null.
- */
- protected String handleTime(Time time) {
- return time == null ? null : time.toString();
- }
-
- /**
- * The formatted timestamp.
- *
- * @param timestamp - timestamp read from resultset
- * @param timestampFormatString - format string
- * @return - formatted time stamp.
- */
- protected String handleTimestamp(Timestamp timestamp, String timestampFormatString) {
- SimpleDateFormat timeFormat = new SimpleDateFormat(timestampFormatString);
- return timestamp == null ? null : timeFormat.format(timestamp);
- }
-
- private String getColumnValue(ResultSet rs, int colType, int colIndex, boolean trim,
- String dateFormatString, String timestampFormatString) throws SQLException, IOException {
-
- String value = "";
-
- switch (colType) {
- case Types.BIT:
- case Types.JAVA_OBJECT:
- value = handleObject(rs.getObject(colIndex));
- break;
- case Types.BOOLEAN:
- boolean b = rs.getBoolean(colIndex);
- value = Boolean.valueOf(b).toString();
- break;
- case NCLOB: // todo : use rs.getNClob
- case Types.CLOB:
- Clob c = rs.getClob(colIndex);
- if (c != null) {
- value = read(c);
- }
- break;
- case Types.BIGINT:
- value = handleLong(rs, colIndex);
- break;
- case Types.DECIMAL:
- case Types.DOUBLE:
- case Types.FLOAT:
- case Types.REAL:
- case Types.NUMERIC:
- value = handleBigDecimal(rs.getBigDecimal(colIndex));
- break;
- case Types.INTEGER:
- case Types.TINYINT:
- case Types.SMALLINT:
- value = handleInteger(rs, colIndex);
- break;
- case Types.DATE:
- value = handleDate(rs, colIndex, dateFormatString);
- break;
- case Types.TIME:
- value = handleTime(rs.getTime(colIndex));
- break;
- case Types.TIMESTAMP:
- value = handleTimestamp(rs.getTimestamp(colIndex), timestampFormatString);
- break;
- case NVARCHAR: // todo : use rs.getNString
- case NCHAR: // todo : use rs.getNString
- case LONGNVARCHAR: // todo : use rs.getNString
- case Types.LONGVARCHAR:
- case Types.VARCHAR:
- case Types.CHAR:
- value = getColumnValue(rs, colIndex, trim);
- break;
- default:
- value = "";
- }
-
- if (value == null) {
- value = "";
- }
-
- return value;
- }
-
- /**
- * @param rs
- * @param colIndex
- * @param trim
- * @return
- * @throws SQLException
- */
- public String getColumnValue(ResultSet rs, int colIndex, boolean trim) throws SQLException {
- String value;
- String columnValue = rs.getString(colIndex);
- if (trim && columnValue != null) {
- value = columnValue.trim();
- } else {
- value = columnValue;
- }
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala
deleted file mode 100644
index d6932cd..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataPartitionRDD.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.common.logging.impl.StandardLogService
-import org.apache.carbondata.spark.PartitionResult
-import org.apache.carbondata.spark.partition.api.impl.CSVFilePartitioner
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-
-class CarbonSparkRawDataPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit)
- extends Partition {
-
- override val index: Int = idx
- val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit)
-
- override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * This RDD class is used to create splits the fact csv store to various partitions as per
- * configuration and compute each split in the respective node located in the server.
- * .
- */
-class CarbonDataPartitionRDD[K, V](
- sc: SparkContext,
- results: PartitionResult[K, V],
- databaseName: String,
- tableName: String,
- sourcePath: String,
- targetFolder: String,
- requiredColumns: Array[String],
- headers: String,
- delimiter: String,
- quoteChar: String,
- escapeChar: String,
- multiLine: Boolean,
- partitioner: Partitioner)
- extends RDD[(K, V)](sc, Nil) with Logging {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil
- .getPartitionSplits(sourcePath, partitioner.nodeList, partitioner.partitionCount)
- splits.zipWithIndex.map {s =>
- new CarbonSparkRawDataPartition(id, s._2, s._1)
- }
- }
-
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
- new Iterator[(K, V)] {
- val split = theSplit.asInstanceOf[CarbonSparkRawDataPartition]
- StandardLogService
- .setThreadName(split.serializableHadoopSplit.value.getPartition.getUniqueID, null)
- logInfo("Input split: " + split.serializableHadoopSplit.value)
-
- val csvPart = new CSVFilePartitioner(partitioner.partitionClass, sourcePath)
- csvPart.splitFile(databaseName, tableName,
- split.serializableHadoopSplit.value.getPartition.getFilesPath, targetFolder,
- partitioner.nodeList.toList.asJava, partitioner.partitionCount, partitioner.partitionColumn,
- requiredColumns, delimiter, quoteChar, headers, escapeChar, multiLine)
-
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished) {
- finished = true
- finished
- }
- else {
- !finished
- }
- }
-
- override def next(): (K, V) = {
- results.getKey(partitioner.partitionCount, csvPart.isPartialSuccess)
- }
- }
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonSparkRawDataPartition]
- val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name : " + s.head + s.length)
- s
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b9289ce..a64cf27 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -62,37 +62,6 @@ object CarbonDataRDDFactory extends Logging {
val logger = LogServiceFactory.getLogService(CarbonDataRDDFactory.getClass.getName)
- // scalastyle:off
- def partitionCarbonData(sc: SparkContext,
- databaseName: String,
- tableName: String,
- sourcePath: String,
- targetFolder: String,
- requiredColumns: Array[String],
- headers: String,
- delimiter: String,
- quoteChar: String,
- escapeChar: String,
- multiLine: Boolean,
- partitioner: Partitioner): String = {
- // scalastyle:on
- val status = new
- CarbonDataPartitionRDD(sc, new PartitionResultImpl(), databaseName, tableName, sourcePath,
- targetFolder, requiredColumns, headers, delimiter, quoteChar, escapeChar, multiLine,
- partitioner
- ).collect
- CarbonDataProcessorUtil
- .renameBadRecordsFromInProgressToNormal("partition/" + databaseName + '/' + tableName)
- var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- status.foreach {
- case (key, value) =>
- if (value) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
- }
- }
- loadStatus
- }
-
def mergeCarbonData(
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index fa32d99..4c0c79b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.control.Breaks.{break, breakable}
+import au.com.bytecode.opencsv.CSVReader
import org.apache.commons.lang3.{ArrayUtils, StringUtils}
import org.apache.spark._
import org.apache.spark.rdd.RDD
@@ -38,9 +39,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.load.CarbonLoadModel
-import org.apache.carbondata.spark.partition.reader.{CSVParser, CSVReader}
+import org.apache.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
import org.apache.carbondata.spark.util.GlobalDictionaryUtil
import org.apache.carbondata.spark.util.GlobalDictionaryUtil._
@@ -500,7 +499,7 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
inputStream = FileFactory.getDataInputStream(preDefDictFilePath,
FileFactory.getFileType(preDefDictFilePath))
csvReader = new CSVReader(new InputStreamReader(inputStream, Charset.defaultCharset),
- CSVReader.DEFAULT_SKIP_LINES, new CSVParser(carbonLoadModel.getCsvDelimiter.charAt(0)))
+ carbonLoadModel.getCsvDelimiter.charAt(0))
// read the column data to list iterator
colDictData = csvReader.readAll.iterator
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 818aa4a..e714520 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -49,7 +49,6 @@ import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.spark.CarbonSparkFactory
import org.apache.carbondata.spark.load.CarbonLoaderUtil
import org.apache.carbondata.spark.load.CarbonLoadModel
-import org.apache.carbondata.spark.partition.reader.CSVWriter
import org.apache.carbondata.spark.rdd._
/**
@@ -58,6 +57,16 @@ import org.apache.carbondata.spark.rdd._
object GlobalDictionaryUtil extends Logging {
/**
+ * The default separator to use if none is supplied to the constructor.
+ */
+ val DEFAULT_SEPARATOR: Char = ','
+ /**
+ * The default quote character to use if none is supplied to the
+ * constructor.
+ */
+ val DEFAULT_QUOTE_CHARACTER: Char = '"'
+
+ /**
* find columns which need to generate global dictionary.
*
* @param dimensions dimension list of schema
@@ -354,7 +363,7 @@ object GlobalDictionaryUtil extends Logging {
})
.option("delimiter", {
if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
- "" + CSVWriter.DEFAULT_SEPARATOR
+ "" + DEFAULT_SEPARATOR
}
else {
carbonLoadModel.getCsvDelimiter
@@ -367,7 +376,7 @@ object GlobalDictionaryUtil extends Logging {
.option("codec", "gzip")
.option("quote", {
if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
- "" + CSVWriter. DEFAULT_QUOTE_CHARACTER
+ "" + DEFAULT_QUOTE_CHARACTER
}
else {
carbonLoadModel.getQuoteChar
@@ -592,7 +601,7 @@ object GlobalDictionaryUtil extends Logging {
*/
private def parseRecord(x: String, accum: Accumulator[Int],
csvFileColumns: Array[String]) : (String, String) = {
- val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
+ val tokens = x.split("" + DEFAULT_SEPARATOR)
var columnName: String = ""
var value: String = ""
// such as "," , "", throw ex
@@ -713,7 +722,7 @@ object GlobalDictionaryUtil extends Logging {
if (null != readLine) {
val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
- "" + CSVWriter.DEFAULT_SEPARATOR
+ "" + DEFAULT_SEPARATOR
} else {
carbonLoadModel.getCsvDelimiter
}
@@ -756,7 +765,7 @@ object GlobalDictionaryUtil extends Logging {
df.columns
}
else {
- carbonLoadModel.getCsvHeader.split("" + CSVWriter.DEFAULT_SEPARATOR)
+ carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR)
}
headers = headers.map(headerName => headerName.trim)
val colDictFilePath = carbonLoadModel.getColDictFilePath
@@ -820,7 +829,7 @@ object GlobalDictionaryUtil extends Logging {
var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
getHeaderFormFactFile(carbonLoadModel)
} else {
- carbonLoadModel.getCsvHeader.toLowerCase.split("" + CSVWriter.DEFAULT_SEPARATOR)
+ carbonLoadModel.getCsvHeader.toLowerCase.split("" + DEFAULT_SEPARATOR)
}
headers = headers.map(headerName => headerName.trim)
// prune columns according to the CSV file header, dimension columns
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 972de05..c9fdd6b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -28,13 +28,12 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.ExtractPythonUDFs
-import org.apache.spark.sql.execution.command.PartitionData
import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, PreWriteCheck}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.optimizer.CarbonOptimizer
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
+import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.spark.rdd.CarbonDataFrameRDD
@@ -153,45 +152,6 @@ object CarbonContext {
@transient
val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
- /**
- * @param databaseName - Database Name
- * @param tableName - Table Name
- * @param factPath - Raw CSV data path
- * @param targetPath - Target path where the file will be split as per partition
- * @param delimiter - default file delimiter is comma(,)
- * @param quoteChar - default quote character used in Raw CSV file, Default quote
- * character is double quote(")
- * @param fileHeader - Header should be passed if not available in Raw CSV File, else pass null,
- * Header will be read from CSV
- * @param escapeChar - This parameter by default will be null, there wont be any validation if
- * default escape character(\) is found on the RawCSV file
- * @param multiLine - This parameter will be check for end of quote character if escape character
- * & quote character is set.
- * if set as false, it will check for end of quote character within the line
- * and skips only 1 line if end of quote not found
- * if set as true, By default it will check for 10000 characters in multiple
- * lines for end of quote & skip all lines if end of quote not found.
- */
- final def partitionData(
- databaseName: String = null,
- tableName: String,
- factPath: String,
- targetPath: String,
- delimiter: String = ",",
- quoteChar: String = "\"",
- fileHeader: String = null,
- escapeChar: String = null,
- multiLine: Boolean = false)(hiveContext: HiveContext): String = {
- updateCarbonPorpertiesPath(hiveContext)
- var databaseNameLocal = databaseName
- if (databaseNameLocal == null) {
- databaseNameLocal = "default"
- }
- val partitionDataClass = PartitionData(databaseName, tableName, factPath, targetPath, delimiter,
- quoteChar, fileHeader, escapeChar, multiLine)
- partitionDataClass.run(hiveContext)
- partitionDataClass.partitionStatus
- }
final def updateCarbonPorpertiesPath(hiveContext: HiveContext) {
val carbonPropertiesFilePath = hiveContext.getConf("carbon.properties.filepath", null)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 89c0831..11ec586 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1152,28 +1152,6 @@ private[sql] case class LoadTable(
carbonLoadModel.setColDictFilePath(columnDict)
carbonLoadModel.setDirectLoad(true)
}
- else {
- val fileType = FileFactory.getFileType(partitionLocation)
- if (FileFactory.isFileExist(partitionLocation, fileType)) {
- val file = FileFactory.getCarbonFile(partitionLocation, fileType)
- CarbonUtil.deleteFoldersAndFiles(file)
- }
- partitionLocation += System.currentTimeMillis()
- FileFactory.mkdirs(partitionLocation, fileType)
- LOGGER.info("Initiating Data Partitioning for the Table : (" +
- dbName + "." + tableName + ")")
- partitionStatus = CarbonContext.partitionData(
- dbName,
- tableName,
- factPath,
- partitionLocation,
- delimiter,
- quoteChar,
- fileHeader,
- escapeChar, multiLine)(sqlContext.asInstanceOf[HiveContext])
- carbonLoadModel.setFactFilePath(FileUtils.getPaths(partitionLocation))
- carbonLoadModel.setColDictFilePath(columnDict)
- }
GlobalDictionaryUtil
.generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath)
CarbonDataRDDFactory
@@ -1226,34 +1204,6 @@ private[sql] case class LoadTable(
}
-private[sql] case class PartitionData(databaseName: String, tableName: String, factPath: String,
- targetPath: String, delimiter: String, quoteChar: String,
- fileHeader: String, escapeChar: String, multiLine: Boolean)
- extends RunnableCommand {
-
- var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- val identifier = TableIdentifier(tableName, Option(databaseName))
- val relation = CarbonEnv.getInstance(sqlContext)
- .carbonCatalog.lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
- val dimNames = relation.tableMeta.carbonTable
- .getDimensionByTableName(tableName).asScala.map(_.getColName)
- val msrNames = relation.tableMeta.carbonTable
- .getDimensionByTableName(tableName).asScala.map(_.getColName)
- val targetFolder = targetPath
- partitionStatus = CarbonDataRDDFactory.partitionCarbonData(
- sqlContext.sparkContext, databaseName,
- tableName, factPath, targetFolder, (dimNames ++ msrNames).toArray
- , fileHeader, delimiter,
- quoteChar, escapeChar, multiLine, relation.tableMeta.partitioner)
- if (partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) {
- logInfo("Bad Record Found while partitioning data")
- }
- Seq.empty
- }
-}
-
private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: Option[String],
tableName: String)
extends RunnableCommand {
[3/3] incubator-carbondata git commit:
[CARBONDATA-215][CARBONDATA-216] Corrected file headers and removed
unnecessary open csv files This closes #186
Posted by ch...@apache.org.
[CARBONDATA-215][CARBONDATA-216] Corrected file headers and removed unnecessary open csv files This closes #186
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/64ddf9fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/64ddf9fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/64ddf9fd
Branch: refs/heads/master
Commit: 64ddf9fd2d4a0eda489baa2c51691c725f8cdfbf
Parents: 676bd96 a1b3ad3
Author: chenliang613 <ch...@apache.org>
Authored: Wed Sep 21 09:24:56 2016 -0700
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Sep 21 09:24:56 2016 -0700
----------------------------------------------------------------------
.../carbondata/scan/QueryExecutor_UT.java | 35 --
.../spark/partition/api/DataPartitioner.java | 9 -
.../spark/partition/api/Partition.java | 9 -
.../partition/api/impl/CSVFilePartitioner.java | 365 ------------
.../api/impl/DataPartitionerProperties.java | 3 -
.../partition/api/impl/DefaultLoadBalancer.java | 9 -
.../spark/partition/api/impl/PartitionImpl.java | 9 -
.../api/impl/QueryPartitionHelper.java | 9 -
.../spark/partition/reader/CSVIterator.java | 74 ---
.../spark/partition/reader/CSVParser.java | 559 -------------------
.../spark/partition/reader/CSVReader.java | 496 ----------------
.../spark/partition/reader/CSVWriter.java | 396 -------------
.../spark/partition/reader/LineReader.java | 68 ---
.../spark/partition/reader/ResultSetHelper.java | 87 ---
.../reader/ResultSetHelperService.java | 327 -----------
.../spark/rdd/CarbonDataPartitionRDD.scala | 112 ----
.../spark/rdd/CarbonDataRDDFactory.scala | 31 -
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 7 +-
.../spark/util/GlobalDictionaryUtil.scala | 23 +-
.../org/apache/spark/sql/CarbonContext.scala | 42 +-
.../execution/command/carbonTableSchema.scala | 50 --
21 files changed, 20 insertions(+), 2700 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-carbondata git commit: Corrected file headers and
removed unnecessary open csv files
Posted by ch...@apache.org.
Corrected file headers and removed unnecessary open csv files
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a1b3ad3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a1b3ad3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a1b3ad3b
Branch: refs/heads/master
Commit: a1b3ad3b85ff944ffba2d2f7229f6ca680b30863
Parents: 676bd96
Author: ravipesala <ra...@gmail.com>
Authored: Wed Sep 21 21:30:55 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Sep 21 21:30:55 2016 +0530
----------------------------------------------------------------------
.../carbondata/scan/QueryExecutor_UT.java | 35 --
.../spark/partition/api/DataPartitioner.java | 9 -
.../spark/partition/api/Partition.java | 9 -
.../partition/api/impl/CSVFilePartitioner.java | 365 ------------
.../api/impl/DataPartitionerProperties.java | 3 -
.../partition/api/impl/DefaultLoadBalancer.java | 9 -
.../spark/partition/api/impl/PartitionImpl.java | 9 -
.../api/impl/QueryPartitionHelper.java | 9 -
.../spark/partition/reader/CSVIterator.java | 74 ---
.../spark/partition/reader/CSVParser.java | 559 -------------------
.../spark/partition/reader/CSVReader.java | 496 ----------------
.../spark/partition/reader/CSVWriter.java | 396 -------------
.../spark/partition/reader/LineReader.java | 68 ---
.../spark/partition/reader/ResultSetHelper.java | 87 ---
.../reader/ResultSetHelperService.java | 327 -----------
.../spark/rdd/CarbonDataPartitionRDD.scala | 112 ----
.../spark/rdd/CarbonDataRDDFactory.scala | 31 -
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 7 +-
.../spark/util/GlobalDictionaryUtil.scala | 23 +-
.../org/apache/spark/sql/CarbonContext.scala | 42 +-
.../execution/command/carbonTableSchema.scala | 50 --
21 files changed, 20 insertions(+), 2700 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/core/src/test/java/org/apache/carbondata/scan/QueryExecutor_UT.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/scan/QueryExecutor_UT.java b/core/src/test/java/org/apache/carbondata/scan/QueryExecutor_UT.java
deleted file mode 100644
index 88fa999..0000000
--- a/core/src/test/java/org/apache/carbondata/scan/QueryExecutor_UT.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Copyright Notice
- * =====================================
- * This file contains proprietary information of
- * Huawei Technologies India Pvt Ltd.
- * Copying or reproduction without prior written approval is prohibited.
- * Copyright (c) 2012
- * =====================================
- */
-package org.apache.carbondata.scan;
-
-import junit.framework.TestCase;
-
-public class QueryExecutor_UT extends TestCase {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
index 603e01e..58f3a2d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
@@ -17,15 +17,6 @@
* under the License.
*/
-/**
- * Copyright Notice
- * =====================================
- * This file contains proprietary information of
- * Huawei Technologies India Pvt Ltd.
- * Copying or reproduction without prior written approval is prohibited.
- * Copyright (c) 1997
- * =====================================
- */
package org.apache.carbondata.spark.partition.api;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
index 5d94fa0..61639d3 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
@@ -17,15 +17,6 @@
* under the License.
*/
-/**
- * Copyright Notice
- * =====================================
- * This file contains proprietary information of
- * Huawei Technologies India Pvt Ltd.
- * Copying or reproduction without prior written approval is prohibited.
- * Copyright (c) 1997
- * =====================================
- */
package org.apache.carbondata.spark.partition.api;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/CSVFilePartitioner.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/CSVFilePartitioner.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/CSVFilePartitioner.java
deleted file mode 100644
index c61aafa..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/CSVFilePartitioner.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Copyright Notice
- * =====================================
- * This file contains proprietary information of
- * Huawei Technologies India Pvt Ltd.
- * Copying or reproduction without prior written approval is prohibited.
- * Copyright (c) 1997
- * =====================================
- */
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.zip.GZIPInputStream;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordslogger;
-import org.apache.carbondata.spark.partition.api.DataPartitioner;
-import org.apache.carbondata.spark.partition.api.Partition;
-import org.apache.carbondata.spark.partition.reader.CSVParser;
-import org.apache.carbondata.spark.partition.reader.CSVReader;
-import org.apache.carbondata.spark.partition.reader.CSVWriter;
-
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.sql.execution.command.Partitioner;
-
-/**
- * Split the CSV file into the number of partitions using the given partition information
- */
-public class CSVFilePartitioner {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CSVFilePartitioner.class.getName());
- private String partitionerClass;
- private String sourceFilesBasePath;
- private boolean partialSuccess;
- /**
- * badRecordslogger
- */
- private BadRecordslogger badRecordslogger;
-
- /**
- * @param partitionerClass
- */
- public CSVFilePartitioner(String partitionerClass, String sourceFilesBasePath) {
- this.partitionerClass = partitionerClass;
- this.sourceFilesBasePath = sourceFilesBasePath;
- }
-
- public boolean isPartialSuccess() {
- return partialSuccess;
- }
-
- @Deprecated public void splitFile(String databaseName, String tableName,
- List<String> sourceFilePath, String targetFolder, List<String> nodes, int partitionCount,
- String[] partitionColumn, String[] requiredColumns, String delimiter, String quoteChar,
- String fileHeader, String escapeChar, boolean multiLine) throws Exception {
- LOGGER
- .info("Processing file split: " + sourceFilePath);
-
- // Create the target folder
- FileFactory.mkdirs(targetFolder, FileFactory.getFileType(targetFolder));
-
- String[] headerColumns = null;
-
- HashMap<Partition, CSVWriter> outputStreamsMap =
- new HashMap<Partition, CSVWriter>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- String key = databaseName + '_' + tableName;
- badRecordslogger = new BadRecordslogger(key, "Partition_" + System.currentTimeMillis() + ".log",
- getBadLogStoreLocation("partition/" + databaseName + '/' + tableName));
-
- CSVReader dataInputStream = null;
-
- long recordCounter = 0;
-
- CSVParser customParser = getCustomParser(delimiter, quoteChar, escapeChar);
-
- for (int i = 0; i < sourceFilePath.size(); i++) {
- try {
- CarbonFile file = FileFactory
- .getCarbonFile(sourceFilePath.get(i), FileFactory.getFileType(sourceFilePath.get(i)));
- // File file = new File(sourceFilePath);
- String fileAbsolutePath = file.getAbsolutePath();
- String fileName = null;
- if (!sourceFilesBasePath.endsWith(".csv") && fileAbsolutePath
- .startsWith(sourceFilesBasePath)) {
- if (sourceFilesBasePath.endsWith(File.separator)) {
- fileName = fileAbsolutePath.substring(sourceFilesBasePath.length())
- .replace(File.separator, "_");
- } else {
- fileName = fileAbsolutePath.substring(sourceFilesBasePath.length() + 1)
- .replace(File.separator, "_");
- }
- } else {
- fileName = file.getName();
- }
-
- // Read and prepare columns from first row in file
- DataInputStream inputStream = FileFactory.getDataInputStream(sourceFilePath.get(i),
- FileFactory.getFileType(sourceFilePath.get(i)));
- if (fileName.endsWith(".gz")) {
- GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
- dataInputStream =
- new CSVReader(new InputStreamReader(gzipInputStream, Charset.defaultCharset()),
- CSVReader.DEFAULT_SKIP_LINES, customParser);
- fileName = fileName.substring(0, fileName.indexOf(".gz"));
- } else if (fileName.endsWith(".bz2")) {
- BZip2CompressorInputStream stream = new BZip2CompressorInputStream(inputStream);
- dataInputStream = new CSVReader(new InputStreamReader(stream, Charset.defaultCharset()),
- CSVReader.DEFAULT_SKIP_LINES, customParser);
- fileName = fileName.substring(0, fileName.indexOf(".bz2"));
- } else if (fileName.endsWith(".csv")) {
- dataInputStream =
- new CSVReader(new InputStreamReader(inputStream, Charset.defaultCharset()),
- CSVReader.DEFAULT_SKIP_LINES, customParser);
- fileName = fileName.substring(0, fileName.indexOf(".csv"));
- } else {
- LOGGER.info("Processing file split: Unsupported File Extension: Skipping File : "
- + file.getAbsolutePath());
- partialSuccess = true;
- return;
- }
- dataInputStream.setBadRecordsLogger(badRecordslogger);
- if (fileHeader == null || fileHeader.length() == 0) {
- headerColumns = dataInputStream.readNext();
- } else {
- headerColumns = fileHeader.split(",");
- }
- if (null == headerColumns) {
- LOGGER.info("Csv file does not contain the header column neither the headers are "
- + "passed in DDL or API. Skipping file :: " + sourceFilePath);
- partialSuccess = true;
- return;
- }
- int[] indexes = pruneColumnsAndGetIndexes(headerColumns, requiredColumns);
-
- // In case there is a dummy measure required columns length and
- // header columns length will not be equal
- if ((null == fileHeader || 0 == fileHeader.length()) && (0 == indexes.length) && (
- fileHeader.length() != indexes.length)) {
- LOGGER.info("Column headers are invalid. They do not match with the schema headers."
- + "Skipping file :: " + sourceFilePath);
- partialSuccess = true;
- return;
- }
-
- partitionData(targetFolder, nodes, partitionCount, partitionColumn, headerColumns,
- outputStreamsMap, dataInputStream, recordCounter, fileName, indexes, fileAbsolutePath);
- } catch (IOException e) {
- LOGGER.error(e, e.getMessage());
- } finally {
- CarbonUtil.closeStreams(dataInputStream);
-
- for (CSVWriter dataOutStream : outputStreamsMap.values()) {
- CarbonUtil.closeStreams(dataOutStream);
- }
- badRecordslogger.closeStreams();
- }
- }
- }
-
- private void partitionData(String targetFolder, List<String> nodes, int partitionCount,
- String[] partitionColumn, String[] headerColumns,
- HashMap<Partition, CSVWriter> outputStreamsMap, CSVReader dataInputStream, long recordCounter,
- String fileName, int[] indexes, String fileAbsolutePath)
- throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
- DataPartitioner dataPartitioner =
- getDataPartitioner(targetFolder, nodes, partitionCount, partitionColumn, headerColumns);
-
- //Get partitions and create output streams
- List<Partition> allPartitions = dataPartitioner.getAllPartitions();
-
- loopPartitionsAndPopulateOutStreamMap(outputStreamsMap, fileName, allPartitions);
-
- //Write header in all the target files
- for (CSVWriter dataOutStream : outputStreamsMap.values()) {
- dataOutStream.writeNext(pruneColumns(headerColumns, indexes));
- }
-
- recordCounter = writeTargetStream(outputStreamsMap, dataInputStream, recordCounter, indexes,
- dataPartitioner, headerColumns, fileAbsolutePath);
-
- LOGGER
- .info("Processed Record count: " + recordCounter);
- }
-
- private CSVParser getCustomParser(String delimiter, String quoteChar, String escapeChar) {
- CSVParser customParser = null;
- boolean ignoreQuote = false;
- boolean ignoreEscape = false;
- char defaultQuoteChar = CSVParser.DEFAULT_QUOTE_CHARACTER;
- char defaultEscapeChar = CSVParser.DEFAULT_ESCAPE_CHARACTER;
- if (quoteChar == null || quoteChar.isEmpty() || quoteChar.trim().isEmpty()) {
- ignoreQuote = true;
- } else {
- ignoreQuote = false;
- defaultQuoteChar = quoteChar.charAt(0);
- }
- if (escapeChar == null || escapeChar.isEmpty() || escapeChar.trim().isEmpty()) {
- ignoreEscape = true;
- } else {
- ignoreEscape = false;
- defaultEscapeChar = escapeChar.charAt(0);
- }
- delimiter = CarbonUtil.unescapeChar(delimiter);
- customParser = new CSVParser(delimiter.charAt(0), defaultQuoteChar, defaultEscapeChar,
- CSVParser.DEFAULT_STRICT_QUOTES, CSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE, ignoreQuote,
- ignoreEscape);
- return customParser;
- }
-
- private DataPartitioner getDataPartitioner(String targetFolder, List<String> nodes,
- int partitionCount, String[] partitionColumn, String[] headerColumns)
- throws InstantiationException, IllegalAccessException, ClassNotFoundException {
- DataPartitioner dataPartitioner =
- (DataPartitioner) Class.forName(partitionerClass).newInstance();
-
- Partitioner partitioner = new Partitioner(partitionerClass, partitionColumn, partitionCount,
- nodes.toArray(new String[nodes.size()]));
- //Initialise the partitioner
- dataPartitioner.initialize(targetFolder, headerColumns, partitioner);
- return dataPartitioner;
- }
-
- private long writeTargetStream(HashMap<Partition, CSVWriter> outputStreamsMap,
- CSVReader dataInputStream, long recordCounter, int[] indexes, DataPartitioner dataPartitioner,
- String[] headerColumns, String fileAbsolutePath) throws IOException {
- String[] record = null;
- Partition tartgetPartition = null;
- CSVWriter targetStream = null;
- record = dataInputStream.readNext();
- int skippedLines = 0;
- if (null == record) {
- return recordCounter;
- } else {
- boolean isEqual = compareHeaderColumnWithFirstRecordInCSV(headerColumns, record);
- if (isEqual) {
- record = dataInputStream.readNext();
- recordCounter++;
- }
- }
- while (null != record) {
- tartgetPartition = dataPartitioner.getPartionForTuple(record, recordCounter);
- targetStream = outputStreamsMap.get(tartgetPartition);
- try {
- targetStream.writeNext(pruneColumns(record, indexes));
- } catch (ArrayIndexOutOfBoundsException e) {
- partialSuccess = true;
- skippedLines++;
- badRecordslogger.addBadRecordsToBilder(record, record.length,
- "No. of columns not matched with table columns", null);
- LOGGER.error("BAD Record Found: No. of columns not matched with table columns, "
- + "Skipping line: (" + (recordCounter + 1) + ") in File :" + fileAbsolutePath);
- } catch (Exception e) {
- partialSuccess = true;
- skippedLines++;
- badRecordslogger.addBadRecordsToBilder(record, record.length, e.getMessage(), null);
- LOGGER.info("Exception while processing the record at line " + (recordCounter + 1)
- + " in partiton " + tartgetPartition.getUniqueID());
- } finally {
- record = dataInputStream.readNext();
- recordCounter++;
- }
- }
- if (skippedLines != 0) {
- LOGGER.info("No. of bad records skipped: (" + skippedLines + ") in file:" + fileAbsolutePath);
- }
- return recordCounter;
- }
-
- private boolean compareHeaderColumnWithFirstRecordInCSV(String[] headerColumns,
- String[] firstRecord) {
- String header = StringUtils.join(headerColumns, ',');
- String record = StringUtils.join(firstRecord, ',');
- if (header != null && header.equals(record)) {
- return true;
- }
- return false;
- }
-
- private void loopPartitionsAndPopulateOutStreamMap(HashMap<Partition, CSVWriter> outputStreamsMap,
- String fileName, List<Partition> allPartitions) throws IOException {
- for (Partition partition : allPartitions) {
- String targetFolderPath = partition.getFilePath();
- FileType fileType = FileFactory.getFileType(targetFolderPath);
- FileFactory.mkdirs(targetFolderPath, fileType);
- outputStreamsMap.put(partition, new CSVWriter(new OutputStreamWriter(FileFactory
- .getDataOutputStream(
- targetFolderPath + '/' + fileName + '_' + partition.getUniqueID() + ".csv", fileType,
- (short) 1), Charset.defaultCharset())));
- }
- }
-
- private int[] pruneColumnsAndGetIndexes(String[] headerColumns, String[] requiredColumns) {
- if (requiredColumns == null) {
- requiredColumns = headerColumns;
- }
- List<Integer> indexesList = new ArrayList<Integer>();
- for (int i = 0; i < headerColumns.length; i++) {
- for (int j = 0; j < requiredColumns.length; j++) {
- if (headerColumns[i].equalsIgnoreCase(requiredColumns[j])) {
- indexesList.add(i);
- break;
- }
- }
- }
- int[] indexes = new int[indexesList.size()];
- for (int i = 0; i < indexesList.size(); i++) {
- indexes[i] = indexesList.get(i);
- }
- return indexes;
- }
-
- private String[] pruneColumns(String[] tuple, int[] indexes) {
- String[] sb = new String[indexes.length];
- int length = indexes.length;
- for (int i = 0; i < length; i++) {
- sb[i] = tuple[indexes[i]];
- }
- return sb;
- }
-
- private String getBadLogStoreLocation(String storeLocation) {
- String badLogStoreLocation =
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
- badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
- return badLogStoreLocation;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
index 8a80c4e..bc6e54f 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
@@ -17,9 +17,6 @@
* under the License.
*/
-/**
- *
- */
package org.apache.carbondata.spark.partition.api.impl;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
index 67c257d..9bee8a2 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
@@ -17,15 +17,6 @@
* under the License.
*/
-/**
- * Copyright Notice
- * =====================================
- * This file contains proprietary information of
- * Huawei Technologies India Pvt Ltd.
- * Copying or reproduction without prior written approval is prohibited.
- * Copyright (c) 1997
- * =====================================
- */
package org.apache.carbondata.spark.partition.api.impl;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
index b49db59..bd7cc42 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
@@ -17,15 +17,6 @@
* under the License.
*/
-/**
- * Copyright Notice
- * =====================================
- * This file contains proprietary information of
- * Huawei Technologies India Pvt Ltd.
- * Copying or reproduction without prior written approval is prohibited.
- * Copyright (c) 1997
- * =====================================
- */
package org.apache.carbondata.spark.partition.api.impl;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
index 37d772d..72e7b08 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
@@ -17,15 +17,6 @@
* under the License.
*/
-/**
- * Copyright Notice
- * =====================================
- * This file contains proprietary information of
- * Huawei Technologies India Pvt Ltd.
- * Copying or reproduction without prior written approval is prohibited.
- * Copyright (c) 1997
- * =====================================
- */
package org.apache.carbondata.spark.partition.api.impl;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVIterator.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVIterator.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVIterator.java
deleted file mode 100644
index d57f708..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVIterator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.reader;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Provides an Iterator over the data found in opencsv.
- */
-public class CSVIterator implements Iterator<String[]> {
- private CSVReader reader;
- private String[] nextLine;
-
- /**
- * @param reader reader for the csv data.
- * @throws IOException if unable to read data from the reader.
- */
- public CSVIterator(CSVReader reader) throws IOException {
- this.reader = reader;
- nextLine = reader.readNext();
- }
-
- /**
- * Returns true if the iteration has more elements.
- * In other words, returns true if next() would return an element rather
- * than throwing an exception.
- *
- * @return true if the CSVIterator has more elements.
- */
- public boolean hasNext() {
- return nextLine != null;
- }
-
- /**
- * Returns the next elenebt in the iterator.
- *
- * @return The next element of the iterator.
- */
- public String[] next() {
- String[] temp = nextLine;
- try {
- nextLine = reader.readNext();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return temp;
- }
-
- /**
- * This method is not supported by openCSV and will throw a UnsupportedOperationException
- * if called.
- */
- public void remove() {
- throw new UnsupportedOperationException("This is a read only iterator.");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVParser.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVParser.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVParser.java
deleted file mode 100644
index a052f31..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVParser.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.reader;
-
-/**
- * Copyright 2005 Bytecode Pty Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * A very simple CSV parser released under a commercial-friendly license.
- * This just implements splitting a single line into fields.
- *
- * @author Glen Smith
- * @author Rainer Pruy
- */
-public class CSVParser {
-
- /**
- * The default separator to use if none is supplied to the constructor.
- */
- public static final char DEFAULT_SEPARATOR = ',';
- /**
- * The average size of a line read by openCSV (used for setting the size of StringBuilders).
- */
- public static final int INITIAL_READ_SIZE = 128;
- /**
- * The default quote character to use if none is supplied to the
- * constructor.
- */
- public static final char DEFAULT_QUOTE_CHARACTER = '"';
- /**
- * The default escape character to use if none is supplied to the
- * constructor.
- */
- public static final char DEFAULT_ESCAPE_CHARACTER = '\\';
- /**
- * The default strict quote behavior to use if none is supplied to the
- * constructor.
- */
- public static final boolean DEFAULT_STRICT_QUOTES = false;
- /**
- * The default leading whitespace behavior to use if none is supplied to the
- * constructor.
- */
- public static final boolean DEFAULT_IGNORE_LEADING_WHITESPACE = true;
- /**
- * If the quote character is set to null then there is no quote character.
- */
- public static final boolean DEFAULT_IGNORE_QUOTATIONS = false;
- /**
- * This is the "null" character - if a value is set to this then it is ignored.
- */
- public static final char NULL_CHARACTER = '\0';
- /**
- * This is the character that the CSVParser will treat as the separator.
- */
- private final char separator;
- /**
- * This is the character that the CSVParser will treat as the quotation character.
- */
- private final char quotechar;
- /**
- * This is the character that the CSVParser will treat as the escape character.
- */
- private final char escape;
- /**
- * Determines if the field is between quotes (true) or between separators (false).
- */
- private final boolean strictQuotes;
- /**
- * Ignore any leading white space at the start of the field.
- */
- private final boolean ignoreLeadingWhiteSpace;
- /**
- * Skip over quotation characters when parsing.
- */
- private final boolean ignoreQuotations;
- private boolean ignoreEscapeChar;
- private String pending;
- private boolean inField;
- private int charCountInsideQuote;
-
- /**
- * Constructs CSVParser using a comma for the separator.
- */
- public CSVParser() {
- this(DEFAULT_SEPARATOR, DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER);
- }
-
- /**
- * Constructs CSVParser with supplied separator.
- *
- * @param separator the delimiter to use for separating entries.
- */
- public CSVParser(char separator) {
- this(separator, DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER);
- }
-
- /**
- * Constructs CSVParser with supplied separator and quote char.
- *
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- */
- public CSVParser(char separator, char quotechar) {
- this(separator, quotechar, DEFAULT_ESCAPE_CHARACTER);
- }
-
- /**
- * Constructs CSVReader with supplied separator and quote char.
- *
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- */
- public CSVParser(char separator, char quotechar, char escape) {
- this(separator, quotechar, escape, DEFAULT_STRICT_QUOTES);
- }
-
- /**
- * Constructs CSVParser with supplied separator and quote char.
- * Allows setting the "strict quotes" flag
- *
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- * @param strictQuotes if true, characters outside the quotes are ignored
- */
- public CSVParser(char separator, char quotechar, char escape, boolean strictQuotes) {
- this(separator, quotechar, escape, strictQuotes, DEFAULT_IGNORE_LEADING_WHITESPACE);
- }
-
- /**
- * Constructs CSVParser with supplied separator and quote char.
- * Allows setting the "strict quotes" and "ignore leading whitespace" flags
- *
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- * @param strictQuotes if true, characters outside the quotes are ignored
- * @param ignoreLeadingWhiteSpace if true, white space in front of a quote in a field is ignored
- */
- public CSVParser(char separator, char quotechar, char escape, boolean strictQuotes,
- boolean ignoreLeadingWhiteSpace) {
- this(separator, quotechar, escape, strictQuotes, ignoreLeadingWhiteSpace,
- DEFAULT_IGNORE_QUOTATIONS);
- }
-
- /**
- * Constructs CSVParser with supplied separator and quote char.
- * Allows setting the "strict quotes" and "ignore leading whitespace" flags
- *
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- * @param strictQuotes if true, characters outside the quotes are ignored
- * @param ignoreLeadingWhiteSpace if true, white space in front of a quote in a field is ignored
- * @param ignoreQuotations if true, treat quotations like any other character.
- */
- public CSVParser(char separator, char quotechar, char escape, boolean strictQuotes,
- boolean ignoreLeadingWhiteSpace, boolean ignoreQuotations) {
- if (anyCharactersAreTheSame(separator, quotechar, escape)) {
- throw new UnsupportedOperationException(
- "The separator, quote, and escape characters must be different!");
- }
- if (separator == NULL_CHARACTER) {
- throw new UnsupportedOperationException("The separator character must be defined!");
- }
- this.separator = separator;
- this.quotechar = quotechar;
- this.escape = escape;
- this.strictQuotes = strictQuotes;
- this.ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace;
- this.ignoreQuotations = ignoreQuotations;
- }
-
- /**
- * Constructs CSVParser with supplied separator and quote char.
- * Allows setting the "strict quotes" and "ignore leading whitespace" flags
- *
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- * @param strictQuotes if true, characters outside the quotes are ignored
- * @param ignoreLeadingWhiteSpace if true, white space in front of a quote in a field is ignored
- * @param ignoreQuotations if true, treat quotations like any other character.
- */
- public CSVParser(char separator, char quotechar, char escape, boolean strictQuotes,
- boolean ignoreLeadingWhiteSpace, boolean ignoreQuotations, boolean ignoreEscape) {
- this(separator, quotechar, escape, strictQuotes, ignoreLeadingWhiteSpace, ignoreQuotations);
- this.ignoreEscapeChar = ignoreEscape;
- }
-
- /**
- * @return The default separator for this parser.
- */
- public char getSeparator() {
- return separator;
- }
-
- /**
- * @return The default quotation character for this parser.
- */
- public char getQuotechar() {
- return quotechar;
- }
-
- /**
- * @return The default escape character for this parser.
- */
- public char getEscape() {
- return escape;
- }
-
- /**
- * @return The default strictQuotes setting for this parser.
- */
- public boolean isStrictQuotes() {
- return strictQuotes;
- }
-
- /**
- * @return The default ignoreLeadingWhiteSpace setting for this parser.
- */
- public boolean isIgnoreLeadingWhiteSpace() {
- return ignoreLeadingWhiteSpace;
- }
-
- /**
- * @return the default ignoreQuotation setting for this parser.
- */
- public boolean isIgnoreQuotations() {
- return ignoreQuotations;
- }
-
- /**
- * checks to see if any two of the three characters are the same. This is because in openCSV
- * the separator, quote, and escape characters must the different.
- *
- * @param separator the defined separator character
- * @param quotechar the defined quotation cahracter
- * @param escape the defined escape character
- * @return true if any two of the three are the same.
- */
- private boolean anyCharactersAreTheSame(char separator, char quotechar, char escape) {
- return isSameCharacter(separator, quotechar) || isSameCharacter(separator, escape)
- || isSameCharacter(quotechar, escape);
- }
-
- /**
- * checks that the two characters are the same and are not the defined NULL_CHARACTER.
- *
- * @param c1 first character
- * @param c2 second character
- * @return true if both characters are the same and are not the defined NULL_CHARACTER
- */
- private boolean isSameCharacter(char c1, char c2) {
- return c1 != NULL_CHARACTER && c1 == c2;
- }
-
- /**
- * @return true if something was left over from last call(s)
- */
- public boolean isPending() {
- return pending != null;
- }
-
- public void setPending(String pending) {
- this.pending = pending;
- }
-
- /**
- * Parses an incoming String and returns an array of elements. This method is used when the
- * data spans multiple lines.
- *
- * @param nextLine current line to be processed
- * @return the comma-tokenized list of elements, or null if nextLine is null
- * @throws IOException if bad things happen during the read
- */
- public String[] parseLineMulti(String nextLine) throws IOException {
- return parseLine(nextLine, true);
- }
-
- /**
- * Parses an incoming String and returns an array of elements. This method is used when all
- * data is contained in a single line.
- *
- * @param nextLine Line to be parsed.
- * @return the comma-tokenized list of elements, or null if nextLine is null
- * @throws IOException if bad things happen during the read
- */
- public String[] parseLine(String nextLine) throws IOException {
- return parseLine(nextLine, false);
- }
-
- /**
- * Parses an incoming String and returns an array of elements.
- *
- * @param nextLine the string to parse
- * @param multi Does it take multiple lines to form a single record.
- * @return the comma-tokenized list of elements, or null if nextLine is null
- * @throws IOException if bad things happen during the read
- */
- private String[] parseLine(String nextLine, boolean multi) throws IOException {
-
- if (!multi && pending != null) {
- pending = null;
- }
-
- if (nextLine == null) {
- if (pending != null) {
- String s = pending;
- pending = null;
- return new String[] { s };
- } else {
- return null;
- }
- }
-
- List<String> tokensOnThisLine = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- //CHECKSTYLE:OFF Approval No:Approval-V1R2C10_010
- StringBuilder sb = new StringBuilder(INITIAL_READ_SIZE);
-
- boolean inQuotes = false;
- // CHECKSTYLE:ON
- if (pending != null) {
- sb.append(pending);
- pending = null;
- inQuotes = !this.ignoreQuotations;//true;
- }
- inQuotes = checkForQuotes(nextLine, tokensOnThisLine, sb, inQuotes);
- // line is done - check status
- if ((inQuotes && !ignoreQuotations)) {
- if (multi) {
- // continuing a quoted section, re-append newline
- sb.append('\n');
- charCountInsideQuote = sb.length() - (sb.indexOf("\"") + 1);
- if (charCountInsideQuote >= 10000) //max char count to wait till the quote terminates
- {
- throw new IOException("Un-terminated quoted field after 10000 characters");
- }
- pending = sb.toString();
- sb = null; // this partial content is not to be added to field list yet
- } else {
- throw new IOException("Un-terminated quoted field at end of CSV line");
- }
- } else {
- inField = false;
- charCountInsideQuote = 0;
- }
-
- if (sb != null) {
- tokensOnThisLine.add(sb.toString());
- }
- return tokensOnThisLine.toArray(new String[tokensOnThisLine.size()]);
-
- }
-
- /**
- * @param nextLine
- * @param tokensOnThisLine
- * @param sb
- * @param inQuotes
- * @return
- */
- private boolean checkForQuotes(String nextLine, List<String> tokensOnThisLine, StringBuilder sb,
- boolean inQuotes) {
- for (int i = 0; i < nextLine.length(); i++) {
-
- char c = nextLine.charAt(i);
- //Naresh 00902756 : Modified to skip escape character check
- if (!this.ignoreEscapeChar && c == this.escape) {
- if (isNextCharacterEscapable(nextLine, inQuotes(inQuotes), i)) {
- i = appendNextCharacterAndAdvanceLoop(nextLine, sb, i);
- }
- } else if (c == quotechar) {
- if (isNextCharacterEscapedQuote(nextLine, inQuotes(inQuotes), i)) {
- i = appendNextCharacterAndAdvanceLoop(nextLine, sb, i);
- } else {
- inQuotes = checkForStrictQuotes(nextLine, sb, inQuotes, i, c);
- }
- inField = !inField;
- } else if (c == separator && !(inQuotes && !ignoreQuotations)) {
- tokensOnThisLine.add(sb.toString());
- sb.setLength(0);
- inField = false;
- } else {
- if (!strictQuotes || (inQuotes && !ignoreQuotations)) {
- sb.append(c);
- inField = true;
- }
- }
- }
- return inQuotes;
- }
-
- /**
- * @param nextLine
- * @param sb
- * @param inQuotes
- * @param i
- * @param c
- * @return
- */
- private boolean checkForStrictQuotes(String nextLine, StringBuilder sb, boolean inQuotes, int i,
- char c) {
- inQuotes = !inQuotes;
-
- // the tricky case of an embedded quote in the middle: a,bc"d"ef,g
- if (!strictQuotes) {
- if (i > 2 //not on the beginning of the line
- && nextLine.charAt(i - 1) != this.separator
- //not at the beginning of an escape sequence
- && nextLine.length() > (i + 1) &&
- //not at the end of an escape sequence
- nextLine.charAt(i + 1) != this.separator) {
- if (ignoreLeadingWhiteSpace && sb.length() > 0 && isAllWhiteSpace(sb)) {
- sb.setLength(0);
- } else {
- sb.append(c);
- }
-
- }
- }
- return inQuotes;
- }
-
- /**
- * Appends the next character in the line to the stringbuffer.
- *
- * @param line - line to process
- * @param sb - contains the processed character
- * @param i - current position in the line.
- * @return new position in the line.
- */
- private int appendNextCharacterAndAdvanceLoop(String line, StringBuilder sb, int i) {
- sb.append(line.charAt(i + 1));
- i++;
- return i;
- }
-
- /**
- * Determines if we can process as if we were in quotes.
- *
- * @param inQuotes - are we currently in quotes.
- * @return - true if we should process as if we are inside quotes.
- */
- private boolean inQuotes(boolean inQuotes) {
- return (inQuotes && !ignoreQuotations) || inField;
- }
-
- /**
- * Checks to see if the character after the index is a quotation character.
- * precondition: the current character is a quote or an escape
- *
- * @param nextLine the current line
- * @param inQuotes true if the current context is quoted
- * @param i current index in line
- * @return true if the following character is a quote
- */
- private boolean isNextCharacterEscapedQuote(String nextLine, boolean inQuotes, int i) {
- return inQuotes // we are in quotes, therefore there can be escaped quotes in here.
- && nextLine.length() > (i + 1) // there is indeed another character to check.
- && isCharacterQuoteCharacter(nextLine.charAt(i + 1));
- }
-
- /**
- * Checks to see if the passed in character is the defined quotation character.
- *
- * @param c source character
- * @return true if c is the defined quotation character
- */
- private boolean isCharacterQuoteCharacter(char c) {
- return c == quotechar;
- }
-
- /**
- * checks to see if the character is the defined escape character.
- *
- * @param c source character
- * @return true if the character is the defined escape character
- */
- private boolean isCharacterEscapeCharacter(char c) {
- return c == escape;
- }
-
- /**
- * Checks to see if the character passed in could be escapable. Escapable characters for
- * openCSV are the quotation character or the escape character.
- *
- * @param c source character
- * @return true if the character could be escapable.
- */
- private boolean isCharacterEscapable(char c) {
- return isCharacterQuoteCharacter(c) || isCharacterEscapeCharacter(c);
- }
-
- /**
- * Checks to see if the character after the current index in a String is an escapable character.
- * Meaning the next character is either a quotation character or the escape char and you are
- * inside quotes.
- * precondition: the current character is an escape
- *
- * @param nextLine the current line
- * @param inQuotes true if the current context is quoted
- * @param i current index in line
- * @return true if the following character is a quote
- */
- protected boolean isNextCharacterEscapable(String nextLine, boolean inQuotes, int i) {
- return inQuotes // we are in quotes, therefore there can be escaped quotes in here.
- && nextLine.length() > (i + 1) // there is indeed another character to check.
- && isCharacterEscapable(nextLine.charAt(i + 1));
- }
-
- /**
- * Checks if every element is the character sequence is whitespace.
- * precondition: sb.length() is greater than 0
- *
- * @param sb A sequence of characters to examine
- * @return true if every character in the sequence is whitespace
- */
- protected boolean isAllWhiteSpace(CharSequence sb) {
- return StringUtils.isWhitespace(sb);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVReader.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVReader.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVReader.java
deleted file mode 100644
index ce3f1b4..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVReader.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.reader;
-
-/**
- * Copyright 2005 Bytecode Pty Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Reader;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordslogger;
-
-/**
- * A very simple CSV reader released under a commercial-friendly license.
- *
- * @author Glen Smith
- */
-public class CSVReader implements Closeable, Iterable<String[]> {
-
- public static final boolean DEFAULT_KEEP_CR = false;
- public static final boolean DEFAULT_VERIFY_READER = true;
- /**
- * The default line to start reading.
- */
- public static final int DEFAULT_SKIP_LINES = 0;
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CSVReader.class.getName());
- private CSVParser parser;
- private int skipLines;
- private BufferedReader br;
- private LineReader lineReader;
- private boolean hasNext = true;
- private boolean linesSkiped;
- private boolean keepCR;
- private boolean verifyReader;
- private long lineNum;
- private long skippedLines;
- private boolean multiLine = true;
- private BadRecordslogger badRecordslogger;
-
- /**
- * Constructs CSVReader using a comma for the separator.
- *
- * @param reader the reader to an underlying CSV source.
- */
- public CSVReader(Reader reader) {
- this(reader, CSVParser.DEFAULT_SEPARATOR, CSVParser.DEFAULT_QUOTE_CHARACTER,
- CSVParser.DEFAULT_ESCAPE_CHARACTER);
- }
-
- /**
- * Constructs CSVReader with supplied separator.
- *
- * @param reader the reader to an underlying CSV source.
- * @param separator the delimiter to use for separating entries.
- */
- public CSVReader(Reader reader, char separator) {
- this(reader, separator, CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER);
- }
-
- /**
- * Constructs CSVReader with supplied separator and quote char.
- *
- * @param reader the reader to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- */
- public CSVReader(Reader reader, char separator, char quotechar) {
- this(reader, separator, quotechar, CSVParser.DEFAULT_ESCAPE_CHARACTER, DEFAULT_SKIP_LINES,
- CSVParser.DEFAULT_STRICT_QUOTES);
- }
-
- /**
- * Constructs CSVReader with supplied separator, quote char and quote handling
- * behavior.
- *
- * @param reader the reader to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param strictQuotes sets if characters outside the quotes are ignored
- */
- public CSVReader(Reader reader, char separator, char quotechar, boolean strictQuotes) {
- this(reader, separator, quotechar, CSVParser.DEFAULT_ESCAPE_CHARACTER, DEFAULT_SKIP_LINES,
- strictQuotes);
- }
-
- /**
- * Constructs CSVReader.
- *
- * @param reader the reader to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- */
-
- public CSVReader(Reader reader, char separator, char quotechar, char escape) {
- this(reader, separator, quotechar, escape, DEFAULT_SKIP_LINES, CSVParser.DEFAULT_STRICT_QUOTES);
- }
-
- /**
- * Constructs CSVReader.
- *
- * @param reader the reader to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param line the line number to skip for start reading
- */
- public CSVReader(Reader reader, char separator, char quotechar, int line) {
- this(reader, separator, quotechar, CSVParser.DEFAULT_ESCAPE_CHARACTER, line,
- CSVParser.DEFAULT_STRICT_QUOTES);
- }
-
- /**
- * Constructs CSVReader.
- *
- * @param reader the reader to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- * @param line the line number to skip for start reading
- */
- public CSVReader(Reader reader, char separator, char quotechar, char escape, int line) {
- this(reader, separator, quotechar, escape, line, CSVParser.DEFAULT_STRICT_QUOTES);
- }
-
- /**
- * Constructs CSVReader.
- *
- * @param reader the reader to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- * @param line the line number to skip for start reading
- * @param strictQuotes sets if characters outside the quotes are ignored
- */
- public CSVReader(Reader reader, char separator, char quotechar, char escape, int line,
- boolean strictQuotes) {
- this(reader, separator, quotechar, escape, line, strictQuotes,
- CSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE);
- }
-
- /**
- * Constructs CSVReader with all data entered.
- *
- * @param reader the reader to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- * @param line the line number to skip for start reading
- * @param strictQuotes sets if characters outside the quotes are ignored
- * @param ignoreLeadingWhiteSpace it true, parser should ignore white space before a quote
- * in a field
- */
- public CSVReader(Reader reader, char separator, char quotechar, char escape, int line,
- boolean strictQuotes, boolean ignoreLeadingWhiteSpace) {
- this(reader, line,
- new CSVParser(separator, quotechar, escape, strictQuotes, ignoreLeadingWhiteSpace));
- }
-
- /**
- * Constructs CSVReader with all data entered.
- *
- * @param reader the reader to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escape the character to use for escaping a separator or quote
- * @param line the line number to skip for start reading
- * @param strictQuotes sets if characters outside the quotes are ignored
- * @param ignoreLeadingWhiteSpace if true, parser should ignore white space before a quote
- * in a field
- * @param keepCR if true the reader will keep carriage returns,
- * otherwise it will discard them.
- */
- public CSVReader(Reader reader, char separator, char quotechar, char escape, int line,
- boolean strictQuotes, boolean ignoreLeadingWhiteSpace, boolean keepCR) {
- this(reader, line,
- new CSVParser(separator, quotechar, escape, strictQuotes, ignoreLeadingWhiteSpace), keepCR,
- DEFAULT_VERIFY_READER);
- }
-
- /**
- * Constructs CSVReader with supplied CSVParser.
- *
- * @param reader the reader to an underlying CSV source.
- * @param line the line number to skip for start reading
- * @param csvParser the parser to use to parse input
- */
- public CSVReader(Reader reader, int line, CSVParser csvParser, boolean multiLine) {
- this(reader, line, csvParser);
- this.multiLine = multiLine;
- }
-
- public CSVReader(Reader reader, int line, CSVParser csvParser) {
- this(reader, line, csvParser, DEFAULT_KEEP_CR, DEFAULT_VERIFY_READER);
- }
-
- /**
- * Constructs CSVReader with supplied CSVParser.
- *
- * @param reader the reader to an underlying CSV source.
- * @param line the line number to skip for start reading
- * @param csvParser the parser to use to parse input
- * @param keepCR true to keep carriage returns in data read, false otherwise
- * @param verifyReader true to verify reader before each read, false otherwise
- */
- CSVReader(Reader reader, int line, CSVParser csvParser, boolean keepCR, boolean verifyReader) {
- this.br = (reader instanceof BufferedReader ?
- (BufferedReader) reader :
- new BufferedReader(reader, 30720));
- this.lineReader = new LineReader(br, keepCR);
- this.skipLines = line;
- this.parser = csvParser;
- this.keepCR = keepCR;
- this.verifyReader = verifyReader;
- }
-
- /**
- * @return the CSVParser used by the reader.
- */
- public CSVParser getParser() {
- return parser;
- }
-
- /**
- * Returns the number of lines in the csv file to skip before processing. This is
- * useful when there is miscellaneous data at the beginning of a file.
- *
- * @return the number of lines in the csv file to skip before processing.
- */
- public int getSkipLines() {
- return skipLines;
- }
-
- /**
- * Returns if the reader will keep carriage returns found in data or remove them.
- *
- * @return true if reader will keep carriage returns, false otherwise.
- */
- public boolean keepCarriageReturns() {
- return keepCR;
- }
-
- /**
- * Reads the entire file into a List with each element being a String[] of
- * tokens.
- *
- * @return a List of String[], with each String[] representing a line of the
- * file.
- * @throws IOException if bad things happen during the read
- */
- public List<String[]> readAll() throws IOException {
-
- //CHECKSTYLE:OFF Approval No:Approval-V1R2C10_005
- List<String[]> allElements = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- while (hasNext) {
- String[] nextLineAsTokens = readNext();
- if (nextLineAsTokens != null) {
- allElements.add(nextLineAsTokens);
- }
- }
- return allElements;
- //CHECKSTYLE:ON
- }
-
- /**
- * Reads the next line from the buffer and converts to a string array.
- *
- * @return a string array with each comma-separated element as a separate
- * entry.
- * @throws IOException if bad things happen during the read
- */
- public String[] readNext() throws IOException {
-
- if (this.multiLine) {
- return readMultiLine();
- } else {
- return readSingleLine();
- }
- }
-
- private String[] readSingleLine() throws IOException {
- String[] result = null;
- String nextLine = getNextLine();
- try {
- this.lineNum += 1L;
- result = parser.parseLine(nextLine);
- } catch (IOException e) {
- if ("Un-terminated quoted field at end of CSV line".equals(e.getMessage())) {
- badRecordslogger.addBadRecordsToBilder(new String[] { nextLine }, 1,
- "Un-terminated quoted field at end of CSV line", null);
- LOGGER.info("Found Un-terminated quote @ line [" + this.lineNum + "] : Skipping Line : "
- + nextLine);
- this.skippedLines += 1L;
- result = readNext();
- } else {
- throw e;
- }
- }
- if (null == nextLine) {
- LOGGER.info("Total Number of Lines : " + --this.lineNum);
- LOGGER.info("Number of Lines Skipped: " + this.skippedLines);
- // System.out.println("Total Number of Lines : "+ --this.lineNum);
- // System.out.println("Number of Lines Skipped: "+ this.skippedLines);
- }
- return result;
- }
-
- private String[] readMultiLine() throws IOException {
- int linesread = 0;
- String[] result = null;
- String firstLine = null;
- do {
- this.lineNum += 1L;
- linesread++;
- if (linesread == 2) {
- br.mark(12000);
- }
- String nextLine = getNextLine();
- if (!hasNext) {
- LOGGER.info("Total Number of Lines : " + --this.lineNum);
- LOGGER.info("Number of Lines Skipped: " + this.skippedLines);
- // System.out.println("Total Number of Lines : "+ --this.lineNum);
- // System.out.println("Number of Lines Skipped: "+ this.skippedLines);
- return result; // should throw if still pending?
- }
- try {
- String[] r = parser.parseLineMulti(nextLine);
- if (r.length > 0) {
- if (result == null) {
- result = r;
- } else {
- result = combineResultsFromMultipleReads(result, r);
- }
- }
- } catch (IOException e) {
- if ("Un-terminated quoted field after 10000 characters".equals(e.getMessage())) {
- LOGGER.info("Un-terminated quoted field found after 10000 characters in MultiLine "
- + "(No. Of Line searched : " + linesread + " ) starting from Line :" + (
- this.lineNum - linesread + 1));
- LOGGER.info("Skipped Line Info : " + firstLine);
- parser.setPending(null);
- this.skippedLines += 1;
- this.lineNum += (1 - linesread);
- if (linesread > 1) {
- br.reset();
- }
- int resLength = result != null ? result.length : 0;
- badRecordslogger.addBadRecordsToBilder(result, resLength,
- "Un-terminated quoted field after 10000 characters", null);
- result = readNext();
- } else {
- throw e;
- }
- }
- if (linesread == 1) {
- firstLine = nextLine;
- }
- // String[] r = parser.parseLine(nextLine);
- } while (parser.isPending());
- return result;
- }
-
- /**
- * For multi line records this method combines the current result with the result
- * from previous read(s).
- *
- * @param buffer - previous data read for this record
- * @param lastRead - latest data read for this record.
- * @return String array with union of the buffer and lastRead arrays.
- */
- private String[] combineResultsFromMultipleReads(String[] buffer, String[] lastRead) {
- String[] t = new String[buffer.length + lastRead.length];
- System.arraycopy(buffer, 0, t, 0, buffer.length);
- System.arraycopy(lastRead, 0, t, buffer.length, lastRead.length);
- return t;
- }
-
- /**
- * Reads the next line from the file.
- *
- * @return the next line from the file without trailing newline
- * @throws IOException if bad things happen during the read
- */
- public String getNextLine() throws IOException {
- if (isClosed()) {
- hasNext = false;
- return null;
- }
-
- if (!this.linesSkiped) {
- for (int i = 0; i < skipLines; i++) {
- lineReader.readLine();
- }
- this.linesSkiped = true;
- }
- String nextLine = lineReader.readLine();
- if (nextLine == null) {
- hasNext = false;
- }
- return hasNext ? nextLine : null;
- }
-
- /**
- * Checks to see if the file is closed.
- *
- * @return true if the reader can no longer be read from.
- */
- private boolean isClosed() {
- if (!verifyReader) {
- return false;
- }
- try {
- return !br.ready();
- } catch (IOException e) {
- return true;
- }
- }
-
- /**
- * Closes the underlying reader.
- *
- * @throws IOException if the close fails
- */
- public void close() throws IOException {
- br.close();
- }
-
- /**
- * Creates an Iterator for processing the csv data.
- *
- * @return an String[] iterator.
- */
- public Iterator<String[]> iterator() {
- try {
- return new CSVIterator(this);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Returns if the CSVReader will verify the reader before each read.
- * By default the value is true which is the functionality for version 3.0.
- * If set to false the reader is always assumed ready to read - this is the functionality
- * for version 2.4 and before.
- * The reason this method was needed was that certain types of Readers would return
- * false for its ready() method until a read was done (namely readers created using Channels).
- * This caused opencsv not to read from those readers.
- *
- * @return true if CSVReader will verify the reader before reads. False otherwise.
- * @link https://sourceforge.net/p/opencsv/bugs/108/
- */
- public boolean verifyReader() {
- return this.verifyReader;
- }
-
- public void setBadRecordsLogger(BadRecordslogger badRecordslogger) {
- this.badRecordslogger = badRecordslogger;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVWriter.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVWriter.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVWriter.java
deleted file mode 100644
index 68dbd59..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/CSVWriter.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.reader;
-
-/**
- * Copyright 2005 Bytecode Pty Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.Writer;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-/**
- * A very simple CSV writer released under a commercial-friendly license.
- *
- * @author Glen Smith
- */
-public class CSVWriter implements Closeable, Flushable {
-
- public static final int INITIAL_STRING_SIZE = 128;
- /**
- * The character used for escaping quotes.
- */
- public static final char DEFAULT_ESCAPE_CHARACTER = '"';
- /**
- * The default separator to use if none is supplied to the constructor.
- */
- public static final char DEFAULT_SEPARATOR = ',';
- /**
- * The default quote character to use if none is supplied to the
- * constructor.
- */
- public static final char DEFAULT_QUOTE_CHARACTER = '"';
- /**
- * The quote constant to use when you wish to suppress all quoting.
- */
- public static final char NO_QUOTE_CHARACTER = '\u0000';
- /**
- * The escape constant to use when you wish to suppress all escaping.
- */
- public static final char NO_ESCAPE_CHARACTER = '\u0000';
- /**
- * Default line terminator uses platform encoding.
- */
- public static final String DEFAULT_LINE_END = "\n";
-
- public static final String CARRIAGE_RETURN = "\r";
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CSVWriter.class.getName());
-
- private Writer rawWriter;
- private PrintWriter pw;
- private char separator;
- private char quotechar;
- private char escapechar;
- private String lineEnd;
- private ResultSetHelper resultService = new ResultSetHelperService();
-
- /**
- * Constructs CSVWriter using a comma for the separator.
- *
- * @param writer the writer to an underlying CSV source.
- */
- public CSVWriter(Writer writer) {
- this(writer, DEFAULT_SEPARATOR);
- }
-
- /**
- * Constructs CSVWriter with supplied separator.
- *
- * @param writer the writer to an underlying CSV source.
- * @param separator the delimiter to use for separating entries.
- */
- public CSVWriter(Writer writer, char separator) {
- this(writer, separator, DEFAULT_QUOTE_CHARACTER);
- }
-
- /**
- * Constructs CSVWriter with supplied separator and quote char.
- *
- * @param writer the writer to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- */
- public CSVWriter(Writer writer, char separator, char quotechar) {
- this(writer, separator, quotechar, DEFAULT_ESCAPE_CHARACTER);
- }
-
- /**
- * Constructs CSVWriter with supplied separator and quote char.
- *
- * @param writer the writer to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escapechar the character to use for escaping quotechars or escapechars
- */
- public CSVWriter(Writer writer, char separator, char quotechar, char escapechar) {
- this(writer, separator, quotechar, escapechar, DEFAULT_LINE_END);
- }
-
- /**
- * Constructs CSVWriter with supplied separator and quote char.
- *
- * @param writer the writer to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param lineEnd the line feed terminator to use
- */
- public CSVWriter(Writer writer, char separator, char quotechar, String lineEnd) {
- this(writer, separator, quotechar, DEFAULT_ESCAPE_CHARACTER, lineEnd);
- }
-
- /**
- * Constructs CSVWriter with supplied separator, quote char, escape char and line ending.
- *
- * @param writer the writer to an underlying CSV source.
- * @param separator the delimiter to use for separating entries
- * @param quotechar the character to use for quoted elements
- * @param escapechar the character to use for escaping quotechars or escapechars
- * @param lineEnd the line feed terminator to use
- */
- public CSVWriter(Writer writer, char separator, char quotechar, char escapechar, String lineEnd) {
- this.rawWriter = writer;
- this.pw = new PrintWriter(writer);
- this.separator = separator;
- this.quotechar = quotechar;
- this.escapechar = escapechar;
- this.lineEnd = lineEnd;
- }
-
- /**
- * Writes the entire list to a CSV file. The list is assumed to be a
- * String[]
- *
- * @param allLines a List of String[], with each String[] representing a line of
- * the file.
- * @param applyQuotesToAll true if all values are to be quoted. false if quotes only
- * to be applied to values which contain the separator, escape,
- * quote or new line characters.
- */
- public void writeAll(List<String[]> allLines, boolean applyQuotesToAll) {
- for (String[] line : allLines) {
- writeNext(line, applyQuotesToAll);
- }
- }
-
- /**
- * Writes the entire list to a CSV file. The list is assumed to be a
- * String[]
- *
- * @param allLines a List of String[], with each String[] representing a line of
- * the file.
- */
- public void writeAll(List<String[]> allLines) {
- for (String[] line : allLines) {
- writeNext(line);
- }
- }
-
- /**
- * Writes the column names.
- *
- * @param rs - ResultSet containing column names.
- * @throws SQLException - thrown by ResultSet::getColumnNames
- */
- protected void writeColumnNames(ResultSet rs) throws SQLException {
-
- writeNext(resultService.getColumnNames(rs));
- }
-
- /**
- * Writes the entire ResultSet to a CSV file.
- * The caller is responsible for closing the ResultSet.
- *
- * @param rs the result set to write
- * @param includeColumnNames true if you want column names in the output, false otherwise
- * @throws IOException thrown by getColumnValue
- * @throws SQLException thrown by getColumnValue
- */
- public void writeAll(ResultSet rs, boolean includeColumnNames) throws SQLException, IOException {
- writeAll(rs, includeColumnNames, false);
- }
-
- /**
- * Writes the entire ResultSet to a CSV file.
- * The caller is responsible for closing the ResultSet.
- *
- * @param rs the Result set to write.
- * @param includeColumnNames include the column names in the output.
- * @param trim remove spaces from the data before writing.
- * @throws IOException thrown by getColumnValue
- * @throws SQLException thrown by getColumnValue
- */
- public void writeAll(ResultSet rs, boolean includeColumnNames, boolean trim)
- throws SQLException, IOException {
-
- if (includeColumnNames) {
- writeColumnNames(rs);
- }
-
- while (rs.next()) {
- writeNext(resultService.getColumnValues(rs, trim));
- }
- }
-
- /**
- * Writes the next line to the file.
- *
- * @param nextLine a string array with each comma-separated element as a separate
- * entry.
- * @param applyQuotesToAll true if all values are to be quoted. false applies quotes only
- * to values which contain the separator, escape, quote or new line
- * characters.
- */
- public void writeNext(String[] nextLine, boolean applyQuotesToAll) {
-
- if (nextLine == null) {
- return;
- }
-
- StringBuilder sb = new StringBuilder(INITIAL_STRING_SIZE);
- for (int i = 0; i < nextLine.length; i++) {
-
- if (i != 0) {
- sb.append(separator);
- }
-
- String nextElement = nextLine[i];
-
- if (nextElement == null) {
- continue;
- }
-
- Boolean stringContainsSpecialCharacters = stringContainsSpecialCharacters(nextElement);
-
- if ((applyQuotesToAll || stringContainsSpecialCharacters)
- && quotechar != NO_QUOTE_CHARACTER) {
- sb.append(quotechar);
- }
-
- if (stringContainsSpecialCharacters) {
- sb.append(processLine(nextElement));
- } else {
- sb.append(nextElement);
- }
-
- if ((applyQuotesToAll || stringContainsSpecialCharacters)
- && quotechar != NO_QUOTE_CHARACTER) {
- sb.append(quotechar);
- }
- }
-
- sb.append(lineEnd);
- pw.write(sb.toString());
- }
-
- /**
- * Writes the next line to the file.
- *
- * @param nextLine a string array with each comma-separated element as a separate
- * entry.
- */
- public void writeNext(String[] nextLine) {
- writeNext(nextLine, true);
- }
-
- /**
- * checks to see if the line contains special characters.
- *
- * @param line - element of data to check for special characters.
- * @return true if the line contains the quote, escape, separator, newline or return.
- */
- private boolean stringContainsSpecialCharacters(String line) {
- return line.indexOf(quotechar) != -1 || line.indexOf(escapechar) != -1
- || line.indexOf(separator) != -1 || line.contains(DEFAULT_LINE_END) || line
- .contains(CARRIAGE_RETURN);
- }
-
- /**
- * Processes all the characters in a line.
- *
- * @param nextElement - element to process.
- * @return a StringBuilder with the elements data.
- */
- protected StringBuilder processLine(String nextElement) {
- StringBuilder sb = new StringBuilder(INITIAL_STRING_SIZE);
- for (int j = 0; j < nextElement.length(); j++) {
- char nextChar = nextElement.charAt(j);
- processCharacter(sb, nextChar);
- }
-
- return sb;
- }
-
- /**
- * Appends the character to the StringBuilder adding the escape character if needed.
- *
- * @param sb - StringBuffer holding the processed character.
- * @param nextChar - character to process
- */
- private void processCharacter(StringBuilder sb, char nextChar) {
- if (escapechar != NO_ESCAPE_CHARACTER && (nextChar == quotechar || nextChar == escapechar)) {
- sb.append(escapechar).append(nextChar);
- } else {
- sb.append(nextChar);
- }
- }
-
- /**
- * Flush underlying stream to writer.
- *
- * @throws IOException if bad things happen
- */
- public void flush() throws IOException {
-
- pw.flush();
-
- }
-
- /**
- * Close the underlying stream writer flushing any buffered content.
- *
- * @throws IOException if bad things happen
- */
- public void close() throws IOException {
- flush();
- pw.close();
- rawWriter.close();
- }
-
- /**
- * Checks to see if the there has been an error in the printstream.
- *
- * @return <code>true</code> if the print stream has encountered an error,
- * either on the underlying output stream or during a format
- * conversion.
- */
- public boolean checkError() {
- return pw.checkError();
- }
-
- /**
- * Sets the result service.
- *
- * @param resultService - the ResultSetHelper
- */
- public void setResultService(ResultSetHelper resultService) {
- this.resultService = resultService;
- }
-
- /**
- * flushes the writer without throwing any exceptions.
- */
- public void flushQuietly() {
- try {
- flush();
- } catch (IOException e) {
- LOGGER.debug("Error while flushing");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a1b3ad3b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/LineReader.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/LineReader.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/LineReader.java
deleted file mode 100644
index 3ee6228..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/reader/LineReader.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.reader;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-
-/**
- * This class was created for issue #106 (https://sourceforge.net/p/opencsv/bugs/106/) where
- * carriage returns were being removed. This class allows the user to determine if they wish to
- * keep or remove them from the data being read.
- * Created by scott on 2/19/15.
- */
-
-public class LineReader {
- private BufferedReader reader;
- private boolean keepCarriageReturns;
-
- /**
- * LineReader constructor.
- *
- * @param reader - Reader that data will be read from.
- * @param keepCarriageReturns - true if carriage returns should remain in the data, false
- * to remove them.
- */
- public LineReader(BufferedReader reader, boolean keepCarriageReturns) {
- this.reader = reader;
- this.keepCarriageReturns = keepCarriageReturns;
- }
-
- /**
- * Reads the next line from the Reader.
- *
- * @return - Line read from reader.
- * @throws IOException - on error from BufferedReader
- */
- public String readLine() throws IOException {
- return keepCarriageReturns ? readUntilNewline() : reader.readLine();
- }
-
- private String readUntilNewline() throws IOException {
- StringBuilder sb = new StringBuilder(CSVParser.INITIAL_READ_SIZE);
- for (int c = reader.read();
- c > -1 && c != '\n';
- c = reader.read()) { //CHECKSTYLE:OFF Approval No:Approval-V1R2C10_006
- sb.append((char) c);
- }//CHECKSTYLE:ON
-
- return sb.length() > 0 ? sb.toString() : null;
- }
-}