You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:49 UTC
[11/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
new file mode 100644
index 0000000..d2e716f
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.spark.partition.api.Partition;
+import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
+import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
+import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
+import org.apache.carbondata.spark.splits.TableSplit;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * This utilty parses the Carbon query plan to actual query model object.
+ */
+public final class CarbonQueryUtil {
+
+ private CarbonQueryUtil() {
+
+ }
+
+ /**
+ * It creates the one split for each region server.
+ */
+ public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
+ CarbonQueryPlan queryPlan) throws IOException {
+
+ //Just create splits depends on locations of region servers
+ List<Partition> allPartitions = null;
+ if (queryPlan == null) {
+ allPartitions =
+ QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
+ } else {
+ allPartitions =
+ QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
+ }
+ TableSplit[] splits = new TableSplit[allPartitions.size()];
+ for (int i = 0; i < splits.length; i++) {
+ splits[i] = new TableSplit();
+ List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ Partition partition = allPartitions.get(i);
+ String location = QueryPartitionHelper.getInstance()
+ .getLocation(partition, databaseName, tableName);
+ locations.add(location);
+ splits[i].setPartition(partition);
+ splits[i].setLocations(locations);
+ }
+
+ return splits;
+ }
+
+ /**
+ * It creates the one split for each region server.
+ */
+ public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
+
+ //Just create splits depends on locations of region servers
+ DefaultLoadBalancer loadBalancer = null;
+ List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
+ loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
+ TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
+ for (int i = 0; i < tblSplits.length; i++) {
+ tblSplits[i] = new TableSplit();
+ List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ Partition partition = allPartitions.get(i);
+ String location = loadBalancer.getNodeForPartitions(partition);
+ locations.add(location);
+ tblSplits[i].setPartition(partition);
+ tblSplits[i].setLocations(locations);
+ }
+ return tblSplits;
+ }
+
+ /**
+ * split sourcePath by comma
+ */
+ public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
+ String separator) {
+ if (StringUtils.isNotEmpty(sourcePath)) {
+ String[] files = sourcePath.split(separator);
+ for (String file : files) {
+ partitionsFiles.add(file);
+ }
+ }
+ }
+
+ private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
+ List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
+ List<Partition> partitionList =
+ new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
+
+ partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
+ partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
+
+ for (int i = 0; i < files.size(); i++) {
+ partitionFiles.get(i % 1).add(files.get(i));
+ }
+ return partitionList;
+ }
+
+ public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
+ List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ if (null != details) {
+ for (LoadMetadataDetails oneLoad : details) {
+ if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) {
+ String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
+ slices.add(loadName);
+ }
+ }
+ }
+ return slices;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
new file mode 100644
index 0000000..11cf9f8
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Project Name : Carbon
+ * Module Name : CARBON Data Processor
+ * Author : R00903928
+ * Created Date : 15-Sep-2015
+ * FileName : LoadMetadataUtil.java
+ * Description : Kettle step to generate MD Key
+ * Class Version : 1.0
+ */
+package org.apache.carbondata.spark.util;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+
+public final class LoadMetadataUtil {
+ private LoadMetadataUtil() {
+
+ }
+
+ public static boolean isLoadDeletionRequired(CarbonLoadModel loadModel) {
+ CarbonTable table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+ .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
+
+ String metaDataLocation = table.getMetaDataFilepath();
+ LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+ if (details != null && details.length != 0) {
+ for (LoadMetadataDetails oneRow : details) {
+ if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())
+ || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus()))
+ && oneRow.getVisibility().equalsIgnoreCase("true")) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonAliasDecoderRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonAliasDecoderRelation.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonAliasDecoderRelation.scala
new file mode 100644
index 0000000..0b652ef
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonAliasDecoderRelation.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
+
+case class CarbonAliasDecoderRelation() {
+
+ val attrMap = new java.util.HashMap[AttributeReferenceWrapper, Attribute]
+
+ def put(key: Attribute, value: Attribute): Unit = {
+ attrMap.put(AttributeReferenceWrapper(key), value)
+ }
+
+ def getOrElse(key: Attribute, default: Attribute): Attribute = {
+ val value = attrMap.get(AttributeReferenceWrapper(key))
+ if (value == null) {
+ default
+ } else {
+ if (value.equals(key)) {
+ value
+ } else {
+ getOrElse(value, value)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
new file mode 100644
index 0000000..ea97bca
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+ /**
+ * Carbon column validator
+ */
+class CarbonColumnValidator extends ColumnValidator {
+ def validateColumns(allColumns: Seq[ColumnSchema]) {
+ allColumns.foreach { columnSchema =>
+ val colWithSameId = allColumns.filter { x =>
+ x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)
+ }
+ if (colWithSameId.size > 1) {
+ throw new MalformedCarbonCommandException("Two column can not have same columnId")
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
new file mode 100644
index 0000000..2a580dc
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.scan.expression.conditional._
+import org.apache.carbondata.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * All filter conversions are done here.
+ */
+object CarbonFilters {
+
+ /**
+ * Converts data sources filters to carbon filter predicates.
+ */
+ def createCarbonFilter(schema: StructType,
+ predicate: sources.Filter): Option[CarbonExpression] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+ predicate match {
+
+ case sources.EqualTo(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+
+ case sources.EqualNullSafe(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+
+ case sources.GreaterThan(name, value) =>
+ Some(new GreaterThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThan(name, value) =>
+ Some(new LessThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThanOrEqual(name, value) =>
+ Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThanOrEqual(name, value) =>
+ Some(new LessThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+
+ case sources.In(name, values) =>
+ Some(new InExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+ case sources.Not(sources.In(name, values)) =>
+ Some(new NotInExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+
+ case sources.And(lhs, rhs) =>
+ (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(lhs)
+ rhsFilter <- createFilter(rhs)
+ } yield {
+ new OrExpression(lhsFilter, rhsFilter)
+ }
+
+ case _ => None
+ }
+ }
+
+ def getCarbonExpression(name: String) = {
+ new CarbonColumnExpression(name,
+ CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+ }
+
+ def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
+ new CarbonLiteralExpression(value,
+ CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+ }
+
+ createFilter(predicate)
+ }
+
+
+ // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
+ // Mostly dimension filters are only pushed down since it is faster in carbon.
+ def selectFilters(filters: Seq[Expression],
+ attrList: java.util.HashSet[AttributeReferenceWrapper],
+ aliasMap: CarbonAliasDecoderRelation): Unit = {
+ def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
+ expr match {
+ case or@ Or(left, right) =>
+
+ val leftFilter = translate(left, or = true)
+ val rightFilter = translate(right, or = true)
+ if (leftFilter.isDefined && rightFilter.isDefined) {
+ Some( sources.Or(leftFilter.get, rightFilter.get))
+ } else {
+ or.collect {
+ case attr: AttributeReference =>
+ attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+ }
+ None
+ }
+
+ case And(left, right) =>
+ (translate(left) ++ translate(right)).reduceOption(sources.And)
+
+ case EqualTo(a: Attribute, Literal(v, t)) =>
+ Some(sources.EqualTo(a.name, v))
+ case EqualTo(l@Literal(v, t), a: Attribute) =>
+ Some(sources.EqualTo(a.name, v))
+ case EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.EqualTo(a.name, v))
+ case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.EqualTo(a.name, v))
+
+ case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
+ case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
+ case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.Not(sources.In(a.name, hSet.toArray)))
+ case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.In(a.name, hSet.toArray))
+ case Not(In(Cast(a: Attribute, _), list))
+ if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.Not(sources.In(a.name, hSet.toArray)))
+ case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.In(a.name, hSet.toArray))
+
+ case GreaterThan(a: Attribute, Literal(v, t)) =>
+ Some(sources.GreaterThan(a.name, v))
+ case GreaterThan(Literal(v, t), a: Attribute) =>
+ Some(sources.LessThan(a.name, v))
+ case GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.GreaterThan(a.name, v))
+ case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.LessThan(a.name, v))
+
+ case LessThan(a: Attribute, Literal(v, t)) =>
+ Some(sources.LessThan(a.name, v))
+ case LessThan(Literal(v, t), a: Attribute) =>
+ Some(sources.GreaterThan(a.name, v))
+ case LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.LessThan(a.name, v))
+ case LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.GreaterThan(a.name, v))
+
+ case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+ case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+ case GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+ case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+
+ case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+ case LessThanOrEqual(Literal(v, t), a: Attribute) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+ case LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+ case LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+
+ case others =>
+ if (!or) {
+ others.collect {
+ case attr: AttributeReference =>
+ attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+ }
+ }
+ None
+ }
+ }
+ filters.flatMap(translate(_, false)).toArray
+ }
+
+ def processExpression(exprs: Seq[Expression],
+ attributesNeedToDecode: java.util.HashSet[AttributeReference],
+ unprocessedExprs: ArrayBuffer[Expression],
+ carbonTable: CarbonTable): Option[CarbonExpression] = {
+ def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
+ expr match {
+ case or@ Or(left, right) =>
+ val leftFilter = transformExpression(left, true)
+ val rightFilter = transformExpression(right, true)
+ if (leftFilter.isDefined && rightFilter.isDefined) {
+ Some(new OrExpression(leftFilter.get, rightFilter.get))
+ } else {
+ or.collect {
+ case attr: AttributeReference => attributesNeedToDecode.add(attr)
+ }
+ unprocessedExprs += or
+ None
+ }
+
+ case And(left, right) =>
+ (transformExpression(left) ++ transformExpression(right)).reduceOption(new
+ AndExpression(_, _))
+
+ case EqualTo(a: Attribute, l@Literal(v, t)) => new
+ Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+ case EqualTo(l@Literal(v, t), a: Attribute) => new
+ Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+ case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
+ Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+ case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
+ Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+
+ case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
+ Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+ case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
+ Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+ case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
+ Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+ case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
+ Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+ case IsNotNull(child: Attribute) =>
+ Some(new NotEqualsExpression(transformExpression(child).get,
+ transformExpression(Literal(null)).get, true))
+ case IsNull(child: Attribute) =>
+ Some(new EqualToExpression(transformExpression(child).get,
+ transformExpression(Literal(null)).get, true))
+ case Not(In(a: Attribute, list))
+ if !list.exists(!_.isInstanceOf[Literal]) =>
+ if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+ Some(new FalseExpression(transformExpression(a).get))
+ } else {
+ Some(new NotInExpression(transformExpression(a).get,
+ new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+ }
+ case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ Some(new InExpression(transformExpression(a).get,
+ new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+ case Not(In(Cast(a: Attribute, _), list))
+ if !list.exists(!_.isInstanceOf[Literal]) =>
+ /* if any illogical expression comes in NOT IN Filter like
+ NOT IN('scala',NULL) this will be treated as false expression and will
+ always return no result. */
+ if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+ Some(new FalseExpression(transformExpression(a).get))
+ } else {
+ Some(new NotInExpression(transformExpression(a).get, new ListExpression(
+ convertToJavaList(list.map(transformExpression(_).get)))))
+ }
+ case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ Some(new InExpression(transformExpression(a).get,
+ new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+
+ case GreaterThan(a: Attribute, l@Literal(v, t)) =>
+ Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+ case GreaterThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
+ Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+ case GreaterThan(l@Literal(v, t), a: Attribute) =>
+ Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+ case GreaterThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+
+ case LessThan(a: Attribute, l@Literal(v, t)) =>
+ Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+ case LessThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
+ Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+ case LessThan(l@Literal(v, t), a: Attribute) =>
+ Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+ case LessThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+
+ case GreaterThanOrEqual(a: Attribute, l@Literal(v, t)) =>
+ Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+ transformExpression(l).get))
+ case GreaterThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
+ Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+ transformExpression(l).get))
+ case GreaterThanOrEqual(l@Literal(v, t), a: Attribute) =>
+ Some(new LessThanEqualToExpression(transformExpression(a).get,
+ transformExpression(l).get))
+ case GreaterThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(new LessThanEqualToExpression(transformExpression(a).get,
+ transformExpression(l).get))
+
+ case LessThanOrEqual(a: Attribute, l@Literal(v, t)) =>
+ Some(new LessThanEqualToExpression(transformExpression(a).get,
+ transformExpression(l).get))
+ case LessThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
+ Some(new LessThanEqualToExpression(transformExpression(a).get,
+ transformExpression(l).get))
+ case LessThanOrEqual(l@Literal(v, t), a: Attribute) =>
+ Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+ transformExpression(l).get))
+ case LessThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+ transformExpression(l).get))
+
+ case AttributeReference(name, dataType, _, _) =>
+ Some(new CarbonColumnExpression(name,
+ CarbonScalaUtil.convertSparkToCarbonDataType(
+ getActualCarbonDataType(name, carbonTable))))
+ case Literal(name, dataType) => Some(new
+ CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
+ case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left)
+ case others =>
+ if (!or) {
+ others.collect {
+ case attr: AttributeReference => attributesNeedToDecode.add(attr)
+ }
+ unprocessedExprs += others
+ }
+ None
+ }
+ }
+ exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
+ }
+ private def isNullLiteral(exp: Expression): Boolean = {
+ if (null != exp
+ && exp.isInstanceOf[Literal]
+ && (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType)
+ || (exp.asInstanceOf[Literal].value == null)) {
+ true
+ } else {
+ false
+ }
+ }
+ private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
+ var carbonColumn: CarbonColumn =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
+ val dataType = if (carbonColumn != null) {
+ carbonColumn.getDataType
+ } else {
+ carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
+ carbonColumn.getDataType match {
+ case DataType.INT => DataType.LONG
+ case DataType.LONG => DataType.LONG
+ case DataType.DECIMAL => DataType.DECIMAL
+ case _ => DataType.DOUBLE
+ }
+ }
+ CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
+ }
+
+ // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
+ // not able find the classes inside scala list and gives ClassNotFoundException.
+ private def convertToJavaList(
+ scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
+ val javaList = new java.util.ArrayList[CarbonExpression]()
+ scalaList.foreach(javaList.add)
+ javaList
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
new file mode 100644
index 0000000..a0503c7
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/**
+ * Contains all options for Spark data source
+ */
+class CarbonOption(options: Map[String, String]) {
+ def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
+
+ def dbName: String = options.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+
+ def tableName: String = options.getOrElse("tableName", "default_table")
+
+ def tablePath: String = s"$dbName/$tableName"
+
+ def tableId: String = options.getOrElse("tableId", "default_table_id")
+
+ def partitionCount: String = options.getOrElse("partitionCount", "1")
+
+ def partitionClass: String = {
+ options.getOrElse("partitionClass",
+ "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
+ }
+
+ def tempCSV: Boolean = options.getOrElse("tempCSV", "true").toBoolean
+
+ def compress: Boolean = options.getOrElse("compress", "false").toBoolean
+
+ def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
new file mode 100644
index 0000000..056fcdb
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+
+ /**
+ * Column validator
+ */
+trait ColumnValidator {
+ def validateColumns(columns: Seq[ColumnSchema])
+}
+/**
+ * Dictionary related helper service
+ */
+trait DictionaryDetailService {
+ def getDictionaryDetail(dictFolderPath: String, primDimensions: Array[CarbonDimension],
+ table: CarbonTableIdentifier, storePath: String): DictionaryDetail
+}
+
+/**
+ * Dictionary related detail
+ */
+case class DictionaryDetail(columnIdentifiers: Array[ColumnIdentifier],
+ dictFilePaths: Array[String], dictFileExists: Array[Boolean])
+
+/**
+ * Factory class
+ */
+object CarbonSparkFactory {
+ /**
+ * @return column validator
+ */
+ def getCarbonColumnValidator(): ColumnValidator = {
+ new CarbonColumnValidator
+ }
+
+ /**
+ * @return dictionary helper
+ */
+ def getDictionaryDetailService(): DictionaryDetailService = {
+ new DictionaryDetailHelper
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
new file mode 100644
index 0000000..e9541de
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable.HashMap
+
+import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.datastorage.store.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+
+class DictionaryDetailHelper extends DictionaryDetailService {
+ def getDictionaryDetail(dictfolderPath: String, primDimensions: Array[CarbonDimension],
+ table: CarbonTableIdentifier, storePath: String): DictionaryDetail = {
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, table)
+ val dictFilePaths = new Array[String](primDimensions.length)
+ val dictFileExists = new Array[Boolean](primDimensions.length)
+ val columnIdentifier = new Array[ColumnIdentifier](primDimensions.length)
+
+ val fileType = FileFactory.getFileType(dictfolderPath)
+ // Metadata folder
+ val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType)
+ // need list all dictionary file paths with exists flag
+ val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
+ @Override def accept(pathname: CarbonFile): Boolean = {
+ CarbonTablePath.isDictionaryFile(pathname)
+ }
+ })
+ // 2 put dictionary file names to fileNamesMap
+ val fileNamesMap = new HashMap[String, Int]
+ for (i <- 0 until carbonFiles.length) {
+ fileNamesMap.put(carbonFiles(i).getName, i)
+ }
+ // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not.
+ primDimensions.zipWithIndex.foreach { f =>
+ columnIdentifier(f._2) = f._1.getColumnIdentifier
+ dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId)
+ dictFileExists(f._2) =
+ fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match {
+ case None => false
+ case Some(_) => true
+ }
+ }
+
+ DictionaryDetail(columnIdentifier, dictFilePaths, dictFileExists)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
new file mode 100644
index 0000000..254052b
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * It is just Key value class. I don't get any other alternate to make the RDD class to
+ * work with my minimum knowledge in scala.
+ * May be I will remove later once I gain good knowledge :)
+ *
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.load.LoadMetadataDetails
+
+trait Value[V] extends Serializable {
+ def getValue(value: Array[Object]): V
+}
+
+class ValueImpl extends Value[Array[Object]] {
+ override def getValue(value: Array[Object]): Array[Object] = value
+}
+
+trait RawValue[V] extends Serializable {
+ def getValue(value: Array[Any]): V
+}
+
+class RawValueImpl extends RawValue[Array[Any]] {
+ override def getValue(value: Array[Any]): Array[Any] = value
+}
+
+trait DataLoadResult[K, V] extends Serializable {
+ def getKey(key: String, value: LoadMetadataDetails): (K, V)
+}
+
+class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
+ override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = {
+ (key, value)
+ }
+}
+
+
+trait PartitionResult[K, V] extends Serializable {
+ def getKey(key: Int, value: Boolean): (K, V)
+
+}
+
+class PartitionResultImpl extends PartitionResult[Int, Boolean] {
+ override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+}
+
+trait MergeResult[K, V] extends Serializable {
+ def getKey(key: Int, value: Boolean): (K, V)
+
+}
+
+class MergeResultImpl extends MergeResult[Int, Boolean] {
+ override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+}
+
+trait DeletedLoadResult[K, V] extends Serializable {
+ def getKey(key: String, value: String): (K, V)
+}
+
+class DeletedLoadResultImpl extends DeletedLoadResult[String, String] {
+ override def getKey(key: String, value: String): (String, String) = (key, value)
+}
+
+trait RestructureResult[K, V] extends Serializable {
+ def getKey(key: Int, value: Boolean): (K, V)
+}
+
+class RestructureResultImpl extends RestructureResult[Int, Boolean] {
+ override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
new file mode 100644
index 0000000..551fc9c
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.sql.readers
+
+/**
+ * Parser for parsing lines in bulk. Use this when efficiency is desired.
+ *
+ * @param iter iterator over lines in the file
+ * @param fieldSep the delimiter used to separate fields in a line
+ * @param lineSep the delimiter used to separate lines
+ * @param quote character used to quote fields
+ * @param escape character used to escape the quote character
+ * @param ignoreLeadingSpace ignore white space before a field
+ * @param ignoreTrailingSpace ignore white space after a field
+ * @param headers headers for the columns
+ * @param inputBufSize size of buffer to use for parsing input, tune for performance
+ * @param maxCols maximum number of columns allowed, for safety against bad inputs
+ */
+class CarbonBulkCsvReader (iter: Iterator[String],
+ split: Int,
+ fieldSep: Char = ',',
+ lineSep: String = "\n",
+ quote: Char = '"',
+ escape: Char = '\\',
+ commentMarker: Char = '#',
+ ignoreLeadingSpace: Boolean = true,
+ ignoreTrailingSpace: Boolean = true,
+ headers: Seq[String],
+ inputBufSize: Int = 128,
+ maxCols: Int = 20480)
+ extends CsvReader(fieldSep,
+ lineSep,
+ quote,
+ escape,
+ commentMarker,
+ ignoreLeadingSpace,
+ ignoreTrailingSpace,
+ headers,
+ inputBufSize,
+ maxCols)
+ with Iterator[Array[String]] {
+
+ private val reader = new CarbonStringIteratorReader(iter)
+ parser.beginParsing(reader)
+ private var nextRecord = parser.parseNext()
+
+ /**
+ * get the next parsed line.
+ *
+ * @return array of strings where each string is a field in the CSV record
+ */
+ def next: Array[String] = {
+ val curRecord = nextRecord
+ if(curRecord != null) {
+ nextRecord = parser.parseNext()
+ } else {
+ throw new NoSuchElementException("next record is null")
+ }
+ curRecord
+ }
+
+ def hasNext: Boolean = nextRecord != null
+
+}
+
+/**
+ * A Reader that "reads" from a sequence of lines. Spark's textFile method removes newlines at
+ * end of each line Univocity parser requires a Reader that provides access to the data to be
+ * parsed and needs the newlines to be present
+ * @param iter iterator over RDD[String]
+ */
+private class CarbonStringIteratorReader(val iter: Iterator[String]) extends java.io.Reader {
+
+ private var next: Long = 0
+ private var length: Long = 0 // length of input so far
+ private var start: Long = 0
+ private var str: String = null // current string from iter
+
+ /**
+ * fetch next string from iter, if done with current one
+ * pretend there is a new line at the end of every string we get from from iter
+ */
+ private def refill(): Unit = {
+ if (length == next) {
+ if (iter.hasNext) {
+ str = iter.next
+ start = length
+ // add a space to every line except the last one to store '\n'
+ if (iter.hasNext) {
+ length += (str.length + 1) // allowance for newline removed by SparkContext.textFile()
+ } else {
+ length += str.length
+ }
+ } else {
+ str = null
+ }
+ }
+ }
+
+ /**
+ * read the next character, if at end of string pretend there is a new line
+ */
+ override def read(): Int = {
+ refill()
+ if(next >= length) {
+ -1
+ } else {
+ val cur = next - start
+ next += 1
+ if (cur == str.length) '\n' else str.charAt(cur.toInt)
+ }
+ }
+
+ /**
+ * read from str into cbuf
+ */
+ def read(cbuf: Array[Char], off: Int, len: Int): Int = {
+ refill()
+ var n = 0
+ if ((off < 0) || (off > cbuf.length) || (len < 0) ||
+ ((off + len) > cbuf.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException()
+ } else if (len == 0) {
+ n = 0
+ } else {
+ if (next >= length) { // end of input
+ n = -1
+ } else {
+ n = Math.min(length - next, len).toInt // lesser of amount of input available or buf size
+ // add a '\n' to every line except the last one
+ if (n == length - next && iter.hasNext) {
+ str.getChars((next - start).toInt, (next - start + n - 1).toInt, cbuf, off)
+ cbuf(off + n - 1) = '\n'
+ } else {
+ str.getChars((next - start).toInt, (next - start + n).toInt, cbuf, off)
+ }
+ next += n
+ if (n < len) {
+ val m = read(cbuf, off + n, len - n) // have more space, fetch more input from iter
+ if(m != -1) n += m
+ }
+ }
+ }
+ n
+ }
+
+ override def skip(ns: Long): Long = {
+ throw new IllegalArgumentException("Skip not implemented")
+ }
+
+ override def ready: Boolean = {
+ refill()
+ true
+ }
+
+ override def markSupported: Boolean = false
+
+ override def mark(readAheadLimit: Int): Unit = {
+ throw new IllegalArgumentException("Mark not implemented")
+ }
+
+ override def reset(): Unit = {
+ throw new IllegalArgumentException("Mark and hence reset not implemented")
+ }
+
+ def close(): Unit = { }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
new file mode 100644
index 0000000..e751fe8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.csv
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.databricks.spark.csv.newapi.CarbonTextFile
+import com.databricks.spark.csv.util._
+import com.databricks.spark.sql.readers._
+import org.apache.commons.csv._
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan}
+import org.apache.spark.sql.types._
+import org.slf4j.LoggerFactory
+
+import org.apache.carbondata.processing.etl.DataLoadingException
+
+case class CarbonCsvRelation protected[spark] (
+ location: String,
+ useHeader: Boolean,
+ delimiter: Char,
+ quote: Char,
+ escape: Character,
+ comment: Character,
+ parseMode: String,
+ parserLib: String,
+ ignoreLeadingWhiteSpace: Boolean,
+ ignoreTrailingWhiteSpace: Boolean,
+ userSchema: StructType = null,
+ charset: String = TextFile.DEFAULT_CHARSET.name(),
+ inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext)
+ extends BaseRelation with TableScan with InsertableRelation {
+
+ /**
+ * Limit the number of lines we'll search for a header row that isn't comment-prefixed.
+ */
+ private val MAX_COMMENT_LINES_IN_HEADER = 10
+
+ private val logger = LoggerFactory.getLogger(CarbonCsvRelation.getClass)
+
+ // Parse mode flags
+ if (!ParseModes.isValidMode(parseMode)) {
+ logger.warn(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
+ }
+
+ if((ignoreLeadingWhiteSpace || ignoreLeadingWhiteSpace) && ParserLibs.isCommonsLib(parserLib)) {
+ logger.warn(s"Ignore white space options may not work with Commons parserLib option")
+ }
+
+ private val failFast = ParseModes.isFailFastMode(parseMode)
+ private val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
+ private val permissive = ParseModes.isPermissiveMode(parseMode)
+
+ val schema = inferSchema()
+
+ def tokenRdd(header: Array[String]): RDD[Array[String]] = {
+
+ val baseRDD = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
+
+ if(ParserLibs.isUnivocityLib(parserLib)) {
+ univocityParseCSV(baseRDD, header)
+ } else {
+ val csvFormat = CSVFormat.DEFAULT
+ .withDelimiter(delimiter)
+ .withQuote(quote)
+ .withEscape(escape)
+ .withSkipHeaderRecord(false)
+ .withHeader(header: _*)
+ .withCommentMarker(comment)
+
+ // If header is set, make sure firstLine is materialized before sending to executors.
+ val filterLine = if (useHeader) firstLine else null
+
+ baseRDD.mapPartitions { iter =>
+ // When using header, any input line that equals firstLine is assumed to be header
+ val csvIter = if (useHeader) {
+ iter.filter(_ != filterLine)
+ } else {
+ iter
+ }
+ parseCSV(csvIter, csvFormat)
+ }
+ }
+ }
+
+ // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
+ def buildScan: RDD[Row] = {
+ val schemaFields = schema.fields
+ tokenRdd(schemaFields.map(_.name)).flatMap{ tokens =>
+
+ if (dropMalformed && schemaFields.length != tokens.size) {
+ logger.warn(s"Dropping malformed line: $tokens")
+ None
+ } else if (failFast && schemaFields.length != tokens.size) {
+ throw new RuntimeException(s"Malformed line in FAILFAST mode: $tokens")
+ } else {
+ var index: Int = 0
+ val rowArray = new Array[Any](schemaFields.length)
+ try {
+ index = 0
+ while (index < schemaFields.length) {
+ val field = schemaFields(index)
+ rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable)
+ index = index + 1
+ }
+ Some(Row.fromSeq(rowArray))
+ } catch {
+ case aiob: ArrayIndexOutOfBoundsException if permissive =>
+ (index until schemaFields.length).foreach(ind => rowArray(ind) = null)
+ Some(Row.fromSeq(rowArray))
+ }
+ }
+ }
+ }
+
+ private def inferSchema(): StructType = {
+ if (this.userSchema != null) {
+ userSchema
+ } else {
+ val firstRow = if (ParserLibs.isUnivocityLib(parserLib)) {
+ val escapeVal = if (escape == null) '\\' else escape.charValue()
+ val commentChar: Char = if (comment == null) '\0' else comment
+ new LineCsvReader(fieldSep = delimiter, quote = quote, escape = escapeVal,
+ commentMarker = commentChar).parseLine(firstLine)
+ } else {
+ val csvFormat = CSVFormat.DEFAULT
+ .withDelimiter(delimiter)
+ .withQuote(quote)
+ .withEscape(escape)
+ .withSkipHeaderRecord(false)
+ CSVParser.parse(firstLine, csvFormat).getRecords.get(0).asScala.toArray
+ }
+ if(null == firstRow) {
+ throw new DataLoadingException("First line of the csv is not valid.")
+ }
+ val header = if (useHeader) {
+ firstRow
+ } else {
+ firstRow.zipWithIndex.map { case (value, index) => s"C$index"}
+ }
+ if (this.inferCsvSchema) {
+ InferSchema(tokenRdd(header), header)
+ } else {
+ // By default fields are assumed to be StringType
+ val schemaFields = header.map { fieldName =>
+ StructField(fieldName.toString, StringType, nullable = true)
+ }
+ StructType(schemaFields)
+ }
+ }
+ }
+
+ /**
+ * Returns the first line of the first non-empty file in path
+ */
+ private lazy val firstLine = {
+ val csv = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
+ if (comment == null) {
+ csv.first()
+ } else {
+ csv.take(MAX_COMMENT_LINES_IN_HEADER)
+ .find(x => !StringUtils.isEmpty(x) && !x.startsWith(comment.toString))
+ .getOrElse(sys.error(s"No uncommented header line in " +
+ s"first $MAX_COMMENT_LINES_IN_HEADER lines"))
+ }
+ }
+
+ private def univocityParseCSV(
+ file: RDD[String],
+ header: Seq[String]): RDD[Array[String]] = {
+ // If header is set, make sure firstLine is materialized before sending to executors.
+ val filterLine = if (useHeader) firstLine else null
+ val dataLines = if (useHeader) file.filter(_ != filterLine) else file
+ val rows = dataLines.mapPartitionsWithIndex({
+ case (split, iter) =>
+ val escapeVal = if (escape == null) '\\' else escape.charValue()
+ val commentChar: Char = if (comment == null) '\0' else comment
+
+ new CarbonBulkCsvReader(iter, split,
+ headers = header, fieldSep = delimiter,
+ quote = quote, escape = escapeVal, commentMarker = commentChar,
+ ignoreLeadingSpace = ignoreLeadingWhiteSpace,
+ ignoreTrailingSpace = ignoreTrailingWhiteSpace)
+ }, true)
+ rows
+ }
+
+ private def parseCSV(
+ iter: Iterator[String],
+ csvFormat: CSVFormat): Iterator[Array[String]] = {
+ iter.flatMap { line =>
+ try {
+ val records = CSVParser.parse(line, csvFormat).getRecords
+ if (records.isEmpty) {
+ logger.warn(s"Ignoring empty line: $line")
+ None
+ } else {
+ Some(records.get(0).asScala.toArray)
+ }
+ } catch {
+ case NonFatal(e) if !failFast =>
+ logger.error(s"Exception while parsing line: $line. ", e)
+ None
+ }
+ }
+ }
+
+ // The function below was borrowed from JSONRelation
+ override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ val filesystemPath = new Path(location)
+ val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+ if (overwrite) {
+ try {
+ fs.delete(filesystemPath, true)
+ } catch {
+ case e: IOException =>
+ throw new IOException(
+ s"Unable to clear output directory ${filesystemPath.toString} prior"
+ + s" to INSERT OVERWRITE a CSV table:\n${e.toString}")
+ }
+ // Write the data. We assume that schema isn't changed, and we won't update it.
+ data.saveAsCsvFile(location, Map("delimiter" -> delimiter.toString))
+ } else {
+ sys.error("CSV tables only support INSERT OVERWRITE for now.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
new file mode 100644
index 0000000..b5d7542
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.csv.newapi
+
+import java.nio.charset.Charset
+
+import com.databricks.spark.csv.util.TextFile
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.{NewHadoopRDD, RDD}
+import org.apache.spark.util.FileUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/**
+ * create RDD use CarbonDataLoadInputFormat
+ */
+object CarbonTextFile {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def configSplitMaxSize(context: SparkContext, filePaths: String,
+ hadoopConfiguration: Configuration): Unit = {
+ val defaultParallelism = if (context.defaultParallelism < 1) {
+ 1
+ } else {
+ context.defaultParallelism
+ }
+ val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
+ val blockSize =
+ hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
+ LOGGER.info("[Block Distribution]")
+ // calculate new block size to allow use all the parallelism
+ if (spaceConsumed < defaultParallelism * blockSize) {
+ var newSplitSize: Long = spaceConsumed / defaultParallelism
+ if (newSplitSize < CarbonCommonConstants.CARBON_16MB) {
+ newSplitSize = CarbonCommonConstants.CARBON_16MB
+ }
+ hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
+ LOGGER.info(s"totalInputSpaceConsumed: $spaceConsumed , " +
+ s"defaultParallelism: $defaultParallelism")
+ LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
+ }
+ }
+ private def newHadoopRDD(sc: SparkContext, location: String) = {
+ val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
+ hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location)
+ hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true)
+ hadoopConfiguration.set("io.compression.codecs",
+ """org.apache.hadoop.io.compress.GzipCodec,
+ org.apache.hadoop.io.compress.DefaultCodec,
+ org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
+
+ configSplitMaxSize(sc, location, hadoopConfiguration)
+ new NewHadoopRDD[LongWritable, Text](
+ sc,
+ classOf[TextInputFormat],
+ classOf[LongWritable],
+ classOf[Text],
+ hadoopConfiguration).setName("newHadoopRDD-spark-csv")
+ }
+
+ def withCharset(sc: SparkContext, location: String, charset: String): RDD[String] = {
+ if (Charset.forName(charset) == TextFile.DEFAULT_CHARSET) {
+ newHadoopRDD(sc, location).map(pair => pair._2.toString)
+ } else {
+ // can't pass a Charset object here cause its not serializable
+ // TODO: maybe use mapPartitions instead?
+ newHadoopRDD(sc, location).map(
+ pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
new file mode 100644
index 0000000..89595e5
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.csv.newapi
+
+import com.databricks.spark.csv.{CarbonCsvRelation, CsvSchemaRDD}
+import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Provides access to CSV data from pure SQL statements (i.e. for users of the
+ * JDBC server).
+ */
+class DefaultSource
+ extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
+
+ private def checkPath(parameters: Map[String, String]): String = {
+ parameters.getOrElse("path", sys.error("'path' must be specified for CSV data."))
+ }
+
+ /**
+ * Creates a new relation for data store in CSV given parameters.
+ * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
+ */
+ override def createRelation(sqlContext: SQLContext,
+ parameters: Map[String, String]): BaseRelation = {
+ createRelation(sqlContext, parameters, null)
+ }
+
+ /**
+ * Creates a new relation for data store in CSV given parameters and user supported schema.
+ * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
+ */
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ schema: StructType): BaseRelation = {
+ val path = checkPath(parameters)
+ val delimiter = TypeCast.toChar(parameters.getOrElse("delimiter", ","))
+
+ val quote = parameters.getOrElse("quote", "\"")
+ val quoteChar = if (quote.length == 1) {
+ quote.charAt(0)
+ } else {
+ throw new Exception("Quotation cannot be more than one character.")
+ }
+
+ val escape = parameters.getOrElse("escape", null)
+ val escapeChar: Character = if (escape == null || (escape.length == 0)) {
+ null
+ } else if (escape.length == 1) {
+ escape.charAt(0)
+ } else {
+ throw new Exception("Escape character cannot be more than one character.")
+ }
+
+ val comment = parameters.getOrElse("comment", "#")
+ val commentChar: Character = if (comment == null) {
+ null
+ } else if (comment.length == 1) {
+ comment.charAt(0)
+ } else {
+ throw new Exception("Comment marker cannot be more than one character.")
+ }
+
+ val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+
+ val useHeader = parameters.getOrElse("header", "false")
+ val headerFlag = if (useHeader == "true") {
+ true
+ } else if (useHeader == "false") {
+ false
+ } else {
+ throw new Exception("Header flag can be true or false")
+ }
+
+ val parserLib = parameters.getOrElse("parserLib", ParserLibs.DEFAULT)
+ val ignoreLeadingWhiteSpace = parameters.getOrElse("ignoreLeadingWhiteSpace", "false")
+ val ignoreLeadingWhiteSpaceFlag = if (ignoreLeadingWhiteSpace == "false") {
+ false
+ } else if (ignoreLeadingWhiteSpace == "true") {
+ if (!ParserLibs.isUnivocityLib(parserLib)) {
+ throw new Exception("Ignore whitesspace supported for Univocity parser only")
+ }
+ true
+ } else {
+ throw new Exception("Ignore white space flag can be true or false")
+ }
+ val ignoreTrailingWhiteSpace = parameters.getOrElse("ignoreTrailingWhiteSpace", "false")
+ val ignoreTrailingWhiteSpaceFlag = if (ignoreTrailingWhiteSpace == "false") {
+ false
+ } else if (ignoreTrailingWhiteSpace == "true") {
+ if (!ParserLibs.isUnivocityLib(parserLib)) {
+ throw new Exception("Ignore whitespace supported for the Univocity parser only")
+ }
+ true
+ } else {
+ throw new Exception("Ignore white space flag can be true or false")
+ }
+
+ val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name())
+ // TODO validate charset?
+
+ val inferSchema = parameters.getOrElse("inferSchema", "false")
+ val inferSchemaFlag = if (inferSchema == "false") {
+ false
+ } else if (inferSchema == "true") {
+ true
+ } else {
+ throw new Exception("Infer schema flag can be true or false")
+ }
+
+ CarbonCsvRelation(path,
+ headerFlag,
+ delimiter,
+ quoteChar,
+ escapeChar,
+ commentChar,
+ parseMode,
+ parserLib,
+ ignoreLeadingWhiteSpaceFlag,
+ ignoreTrailingWhiteSpaceFlag,
+ schema,
+ charset,
+ inferSchemaFlag)(sqlContext)
+ }
+
+ override def createRelation(
+ sqlContext: SQLContext,
+ mode: SaveMode,
+ parameters: Map[String, String],
+ data: DataFrame): BaseRelation = {
+ val path = checkPath(parameters)
+ val filesystemPath = new Path(path)
+ val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val doSave = if (fs.exists(filesystemPath)) {
+ mode match {
+ case SaveMode.Append =>
+ sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
+ case SaveMode.Overwrite =>
+ fs.delete(filesystemPath, true)
+ true
+ case SaveMode.ErrorIfExists =>
+ sys.error(s"path $path already exists.")
+ case SaveMode.Ignore => false
+ }
+ } else {
+ true
+ }
+
+ val codec: Class[_ <: CompressionCodec] =
+ parameters.getOrElse("codec", "none") match {
+ case "gzip" => classOf[GzipCodec]
+ case _ => null
+ }
+
+ if (doSave) {
+ // Only save data when the save mode is not ignore.
+ data.saveAsCsvFile(path, parameters, codec)
+ }
+
+ createRelation(sqlContext, parameters, data.schema)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
new file mode 100644
index 0000000..9cc46c1
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+
+import org.apache.carbondata.spark.Value
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class CarbonCleanFilesRDD[V: ClassTag](
+ sc: SparkContext,
+ valueClass: Value[V],
+ databaseName: String,
+ tableName: String,
+ partitioner: Partitioner)
+ extends RDD[V](sc, Nil) {
+
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+ override def getPartitions: Array[Partition] = {
+ val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
+ splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
+ }
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+ val iter = new Iterator[(V)] {
+ val split = theSplit.asInstanceOf[CarbonLoadPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+ // TODO call CARBON delete API
+
+
+ var havePair = false
+ var finished = false
+
+ override def hasNext: Boolean = {
+ if (!finished && !havePair) {
+ finished = true
+ havePair = !finished
+ }
+ !finished
+ }
+
+ override def next(): V = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ havePair = false
+ valueClass.getValue(null)
+ }
+
+ }
+ iter
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val theSplit = split.asInstanceOf[CarbonLoadPartition]
+ val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
+ logInfo("Host Name: " + s.head + s.length)
+ s
+ }
+}
+