You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/12 07:16:35 UTC
[1/2] carbondata git commit: load data for partition table
Repository: carbondata
Updated Branches:
refs/heads/12-dev 18329275f -> 2119fe891
load data for partition table
fix comments
fix comments
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9c861a68
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9c861a68
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9c861a68
Branch: refs/heads/12-dev
Commit: 9c861a68d67ae198d06c062e2473a61af8a5b601
Parents: 1832927
Author: QiangCai <qi...@qq.com>
Authored: Mon Apr 24 11:28:28 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri May 12 15:15:17 2017 +0800
----------------------------------------------------------------------
.../core/scan/partition/HashPartitioner.java | 41 +++
.../core/scan/partition/ListPartitioner.java | 65 +++++
.../core/scan/partition/PartitionUtil.java | 109 +++++++
.../core/scan/partition/Partitioner.java | 27 ++
.../core/scan/partition/RangePartitioner.java | 137 +++++++++
.../carbondata/hadoop/CarbonInputSplit.java | 6 +-
.../TestDataLoadingForPartitionTable.scala | 289 +++++++++++++++++++
.../carbondata/spark/PartitionFactory.scala | 65 +++++
.../spark/rdd/NewCarbonDataLoadRDD.scala | 79 +++++
.../spark/rdd/CarbonDataRDDFactory.scala | 113 +++++++-
.../spark/rdd/CarbonDataRDDFactory.scala | 128 +++++++-
11 files changed, 1038 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/core/src/main/java/org/apache/carbondata/core/scan/partition/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/HashPartitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/HashPartitioner.java
new file mode 100644
index 0000000..240c449
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/HashPartitioner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+/**
+ * Hash Partitioner
+ */
+public class HashPartitioner implements Partitioner {
+
+ private int numPartitions = 0;
+
+ public HashPartitioner(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ @Override public int numPartitions() {
+ return numPartitions;
+ }
+
+ @Override public int getPartition(Object key) {
+ if (key == null) {
+ return 0;
+ }
+ return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
new file mode 100644
index 0000000..bab2ede
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+
+/**
+ * List Partitioner
+ */
+public class ListPartitioner implements Partitioner {
+
+ /**
+ * map the value of ListPartition to partition id.
+ */
+ private Map<Object, Integer> map = new java.util.HashMap<Object, Integer>();
+
+ private int numPartitions;
+
+ public ListPartitioner(PartitionInfo partitionInfo) {
+ List<List<String>> values = partitionInfo.getListInfo();
+ DataType partitionColumnDataType = partitionInfo.getColumnSchemaList().get(0).getDataType();
+ numPartitions = values.size();
+ for (int i = 0; i < numPartitions; i++) {
+ for (String value : values.get(i)) {
+ map.put(PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType), i);
+ }
+ }
+ }
+
+ /**
+ * number of partitions
+ * add extra default partition
+ * @return
+ */
+ @Override public int numPartitions() {
+ return numPartitions + 1;
+ }
+
+ @Override public int getPartition(Object key) {
+ Integer partition = map.get(key);
+ if (partition == null) {
+ return numPartitions;
+ }
+ return partition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
new file mode 100644
index 0000000..a5b3a9f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+import java.math.BigDecimal;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.BitSet;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.commons.lang.StringUtils;
+
+public class PartitionUtil {
+
+ private static LogService LOGGER = LogServiceFactory.getLogService(PartitionUtil.class.getName());
+
+ private static final ThreadLocal<DateFormat> timestampFormatter = new ThreadLocal<DateFormat>() {
+ @Override protected DateFormat initialValue() {
+ return new SimpleDateFormat(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+ }
+ };
+
+ private static final ThreadLocal<DateFormat> dateFormatter = new ThreadLocal<DateFormat>() {
+ @Override protected DateFormat initialValue() {
+ return new SimpleDateFormat(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+ }
+ };
+
+ public static Partitioner getPartitioner(PartitionInfo partitionInfo) {
+ switch (partitionInfo.getPartitionType()) {
+ case HASH:
+ return new HashPartitioner(partitionInfo.getNumPartitions());
+ case LIST:
+ return new ListPartitioner(partitionInfo);
+ case RANGE:
+ return new RangePartitioner(partitionInfo);
+ default:
+ throw new UnsupportedOperationException(
+ "unsupport partition type: " + partitionInfo.getPartitionType().name());
+ }
+ }
+
+ public static Object getDataBasedOnDataType(String data, DataType actualDataType) {
+ if (data == null) {
+ return null;
+ }
+ if (actualDataType != DataType.STRING && StringUtils.isEmpty(data)) {
+ return null;
+ }
+ try {
+ switch (actualDataType) {
+ case STRING:
+ return data;
+ case INT:
+ return Integer.parseInt(data);
+ case SHORT:
+ return Short.parseShort(data);
+ case DOUBLE:
+ return Double.parseDouble(data);
+ case LONG:
+ return Long.parseLong(data);
+ case DATE:
+ return PartitionUtil.dateFormatter.get().parse(data).getTime();
+ case TIMESTAMP:
+ return PartitionUtil.timestampFormatter.get().parse(data).getTime();
+ case DECIMAL:
+ return new BigDecimal(data);
+ default:
+ return data;
+ }
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+
+ public static BitSet generateBitSetBySize(int size, boolean isContainAll) {
+ BitSet bitSet = new BitSet(size);
+ if (isContainAll) {
+ bitSet.set(0, size);
+ }
+ return bitSet;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/core/src/main/java/org/apache/carbondata/core/scan/partition/Partitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/Partitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/Partitioner.java
new file mode 100644
index 0000000..772a98e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/Partitioner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+import java.io.Serializable;
+
+public interface Partitioner extends Serializable {
+
+ int numPartitions();
+
+ int getPartition(Object key);
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
new file mode 100644
index 0000000..c73f0b5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Range Partitioner
+ */
+public class RangePartitioner implements Partitioner {
+
+ private int numPartitions;
+ private Object[] bounds;
+ private SerializableComparator comparator;
+
+ public RangePartitioner(PartitionInfo partitionInfo) {
+ List<String> values = partitionInfo.getRangeInfo();
+ DataType partitionColumnDataType = partitionInfo.getColumnSchemaList().get(0).getDataType();
+ numPartitions = values.size();
+ bounds = new Object[numPartitions];
+ if (partitionColumnDataType == DataType.STRING) {
+ for (int i = 0; i < numPartitions; i++) {
+ bounds[i] = ByteUtil.toBytes(values.get(i));
+ }
+ } else {
+ for (int i = 0; i < numPartitions; i++) {
+ bounds[i] = PartitionUtil.getDataBasedOnDataType(values.get(i), partitionColumnDataType);
+ }
+ }
+
+ switch (partitionColumnDataType) {
+ case INT:
+ comparator = new IntSerializableComparator();
+ break;
+ case SHORT:
+ comparator = new ShortSerializableComparator();
+ break;
+ case DOUBLE:
+ comparator = new DoubleSerializableComparator();
+ break;
+ case LONG:
+ case DATE:
+ case TIMESTAMP:
+ comparator = new LongSerializableComparator();
+ break;
+ case DECIMAL:
+ comparator = new BigDecimalSerializableComparator();
+ break;
+ default:
+ comparator = new ByteArraySerializableComparator();
+ }
+ }
+
+ /**
+ * number of partitions
+ * add extra default partition
+ * @return
+ */
+ @Override public int numPartitions() {
+ return numPartitions + 1;
+ }
+
+ @Override public int getPartition(Object key) {
+ if (key == null) {
+ return numPartitions;
+ } else {
+ for (int i = 0; i < numPartitions; i++) {
+ if (comparator.compareTo(key, bounds[i])) {
+ return i;
+ }
+ }
+ return numPartitions;
+ }
+ }
+
+ interface SerializableComparator extends Serializable {
+ boolean compareTo(Object key1, Object key2);
+ }
+
+ class ByteArraySerializableComparator implements SerializableComparator {
+ @Override public boolean compareTo(Object key1, Object key2) {
+ return ByteUtil.compare((byte[]) key1, (byte[]) key2) < 0;
+ }
+ }
+
+ class IntSerializableComparator implements SerializableComparator {
+ @Override public boolean compareTo(Object key1, Object key2) {
+ return (int) key1 - (int) key2 < 0;
+ }
+ }
+
+ class ShortSerializableComparator implements SerializableComparator {
+ @Override public boolean compareTo(Object key1, Object key2) {
+ return (short) key1 - (short) key2 < 0;
+ }
+ }
+
+ class DoubleSerializableComparator implements SerializableComparator {
+ @Override public boolean compareTo(Object key1, Object key2) {
+ return (double) key1 - (double) key2 < 0;
+ }
+ }
+
+ class LongSerializableComparator implements SerializableComparator {
+ @Override public boolean compareTo(Object key1, Object key2) {
+ return (long) key1 - (long) key2 < 0;
+ }
+ }
+
+ class BigDecimalSerializableComparator implements SerializableComparator {
+ @Override public boolean compareTo(Object key1, Object key2) {
+ return ((BigDecimal) key1).compareTo((BigDecimal) key2) < 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 0dcaba2..1a75bf9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -237,8 +237,10 @@ public class CarbonInputSplit extends FileSplit
String filePath1 = this.getPath().getName();
String filePath2 = other.getPath().getName();
if (CarbonTablePath.isCarbonDataFile(filePath1)) {
- int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
- int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
+ int firstTaskId =
+ Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1).split("_")[0]);
+ int otherTaskId =
+ Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2).split("_")[0]);
if (firstTaskId != otherTaskId) {
return firstTaskId - otherTaskId;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
new file mode 100644
index 0000000..c0ee4f2
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.partition
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll {
+
+ val defaultTimestampFormat = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+
+ override def beforeAll {
+ dropTable
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ sql(
+ """
+ | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(
+ """
+ | CREATE TABLE originMultiLoads (empno int, empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ }
+
+ def validateDataFiles(tableUniqueName: String, sgementId: String, partitions: Seq[Int]): Unit = {
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ val tablePath = new CarbonTablePath(carbonTable.getStorePath, carbonTable.getDatabaseName,
+ carbonTable.getFactTableName)
+ val segmentDir = tablePath.getCarbonDataDirectoryPath("0", sgementId)
+ val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+ val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ override def accept(file: CarbonFile): Boolean = {
+ return file.getName.endsWith(".carbondata")
+ }
+ })
+
+ assert(dataFiles.size == partitions.size)
+
+ dataFiles.foreach { dataFile =>
+ val taskId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile.getName).split("_")(0).toInt
+ assert(partitions.exists(_ == taskId))
+ }
+ }
+
+ test("data loading for partition table: hash partition") {
+ sql(
+ """
+ | CREATE TABLE hashTable (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE hashTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ validateDataFiles("default_hashTable", "0", Seq(0, 1, 2))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from hashTable order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ }
+
+ test("data loading for partition table: range partition") {
+ sql(
+ """
+ | CREATE TABLE rangeTable (empno int, empname String, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (doj Timestamp)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+ | 'RANGE_INFO'='01-01-2010, 01-01-2015, 01-04-2015, 01-07-2015')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE rangeTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ validateDataFiles("default_rangeTable", "0", Seq(0, 1, 3, 4))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTable order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ }
+
+ test("data loading for partition table: list partition") {
+ sql(
+ """
+ | CREATE TABLE listTable (empno int, empname String, designation String, doj Timestamp,
+ | workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (workgroupcategory int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+ | 'LIST_INFO'='0, 1, (2, 3)')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE listTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ validateDataFiles("default_listTable", "0", Seq(1, 2))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTable order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+ }
+
+ test("Insert into for partition table: hash partition") {
+ sql(
+ """
+ | CREATE TABLE hashTableForInsert (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+ """.stripMargin)
+ sql("insert into hashTableForInsert select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+
+ validateDataFiles("default_hashTableForInsert", "0", Seq(0, 1, 2))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from hashTableForInsert order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ }
+
+ test("Insert into for partition table: range partition") {
+ sql(
+ """
+ | CREATE TABLE rangeTableForInsert (empno int, empname String, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (doj Timestamp)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+ | 'RANGE_INFO'='01-01-2010, 01-01-2015, 01-04-2015, 01-07-2015')
+ """.stripMargin)
+ sql("insert into rangeTableForInsert select empno, empname, designation, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, doj from originTable")
+
+ validateDataFiles("default_rangeTableForInsert", "0", Seq(0, 1, 3, 4))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTableForInsert order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ }
+
+ test("Insert into partition table: list partition") {
+ sql(
+ """
+ | CREATE TABLE listTableForInsert (empno int, empname String, designation String, doj Timestamp,
+ | workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (workgroupcategory int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+ | 'LIST_INFO'='0, 1, (2, 3)')
+ """.stripMargin)
+ sql("insert into listTableForInsert select empno, empname, designation, doj, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, workgroupcategory from originTable")
+
+ validateDataFiles("default_listTableForInsert", "0", Seq(1, 2))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTableForInsert order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+ }
+
+ test("multiple data loading for partition table") {
+ sql(
+ """
+ | CREATE TABLE multiLoads (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from multiLoads order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+ }
+
+ test("multiple insertInto for partition table") {
+ sql(
+ """
+ | CREATE TABLE multiInserts (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+ """.stripMargin)
+ sql("insert into multiInserts select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+ sql("insert into multiInserts select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+ sql("insert into multiInserts select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from multiInserts order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+ }
+
+ test("multiple data loading and insertInto for partition table") {
+ sql(
+ """
+ | CREATE TABLE loadAndInsert (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadAndInsert OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql("insert into loadAndInsert select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadAndInsert OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from loadAndInsert order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+ }
+
+ override def afterAll = {
+ dropTable
+ if (defaultTimestampFormat == null) {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ } else {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, defaultTimestampFormat)
+ }
+ }
+
+ def dropTable = {
+ sql("drop table if exists originTable")
+ sql("drop table if exists hashTable")
+ sql("drop table if exists rangeTable")
+ sql("drop table if exists listTable")
+ sql("drop table if exists hashTableForInsert")
+ sql("drop table if exists rangeTableForInsert")
+ sql("drop table if exists listTableForInsert")
+ sql("drop table if exists originMultiLoads")
+ sql("drop table if exists multiLoads")
+ sql("drop table if exists multiInserts")
+ sql("drop table if exists loadAndInsert")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
new file mode 100644
index 0000000..f7758a6
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.spark.Partitioner
+
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.scan.partition.{HashPartitioner => JavaHashPartitioner, ListPartitioner => JavaListPartitioner, RangePartitioner => JavaRangePartitioner}
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+
+object PartitionFactory {
+
+ def getPartitioner(partitionInfo: PartitionInfo): Partitioner = {
+ partitionInfo.getPartitionType match {
+ case PartitionType.HASH => new HashPartitioner(partitionInfo.getNumPartitions)
+ case PartitionType.LIST => new ListPartitioner(partitionInfo)
+ case PartitionType.RANGE => new RangePartitioner(partitionInfo)
+ case partitionType =>
+ throw new CarbonDataLoadingException(s"Unsupport partition type: ${partitionType}")
+ }
+ }
+}
+
+class HashPartitioner(partitions: Int) extends Partitioner {
+
+ private val partitioner = new JavaHashPartitioner(partitions)
+
+ override def numPartitions: Int = partitioner.numPartitions()
+
+ override def getPartition(key: Any): Int = partitioner.getPartition(key)
+}
+
+class ListPartitioner(partitionInfo: PartitionInfo) extends Partitioner {
+
+ private val partitioner = new JavaListPartitioner(partitionInfo)
+
+ override def numPartitions: Int = partitioner.numPartitions()
+
+ override def getPartition(key: Any): Int = partitioner.getPartition(key)
+}
+
+class RangePartitioner(partitionInfo: PartitionInfo) extends Partitioner {
+
+ private val partitioner = new JavaRangePartitioner(partitionInfo)
+
+ override def numPartitions: Int = partitioner.numPartitions()
+
+ override def getPartition(key: Any): Int = partitioner.getPartition(key)
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index a6d231d..4d0770d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -566,3 +566,82 @@ class LazyRddIterator(serializer: SerializerInstance,
}
}
+
+/*
+ * It loads the data to carbon from RDD for partition table
+ * @see org.apache.carbondata.processing.newflow.DataLoadExecutor
+ */
+class PartitionTableDataLoaderRDD[K, V](
+ sc: SparkContext,
+ result: DataLoadResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ loadCount: Integer,
+ tableCreationTime: Long,
+ schemaLastUpdatedTime: Long,
+ prev: RDD[Row]) extends RDD[(K, V)](prev) {
+
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val iter = new Iterator[(K, V)] {
+ val partitionID = "0"
+ val loadMetadataDetails = new LoadMetadataDetails()
+ val model: CarbonLoadModel = carbonLoadModel
+ val uniqueLoadStatusId =
+ carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
+ try {
+
+ loadMetadataDetails.setPartitionCount(partitionID)
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ carbonLoadModel.setPartitionId(partitionID)
+ carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ carbonLoadModel.setPreFetch(false)
+
+ val recordReaders = Array[CarbonIterator[Array[AnyRef]]] {
+ new NewRddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel, context)
+ }
+
+ val loader = new SparkPartitionLoader(model,
+ theSplit.index,
+ null,
+ String.valueOf(loadCount),
+ loadMetadataDetails)
+ // Intialize to set carbon properties
+ loader.initialize()
+ new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
+ } catch {
+ case e: BadRecordFoundException =>
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ logInfo("Bad Record Found")
+ case e: Exception =>
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+ logInfo("DataLoad For Partition Table failure", e)
+ LOGGER.error(e)
+ throw e
+ } finally {
+ // clean up the folders and files created locally for data load operation
+ CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+ // in case of failure the same operation will be re-tried several times.
+ // So print the data load statistics only in case of non failure case
+ if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+ .equals(loadMetadataDetails.getLoadStatus)) {
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance
+ .printStatisticsInfo(model.getPartitionId)
+ }
+ }
+ var finished = false
+
+ override def hasNext: Boolean = !finished
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+ }
+ }
+ iter
+ }
+
+ override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4cca0a3..4e100ed 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -17,6 +17,7 @@
package org.apache.carbondata.spark.rdd
+import java.text.SimpleDateFormat
import java.util
import java.util.UUID
import java.util.concurrent._
@@ -28,10 +29,11 @@ import scala.util.control.Breaks._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, UpdateCoalescedRDD}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD, UpdateCoalescedRDD}
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
import org.apache.spark.sql.hive.DistributionUtil
@@ -43,12 +45,15 @@ import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvload.BlockDetails
+import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -56,7 +61,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep
import org.apache.carbondata.spark._
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
/**
* This is the factory class which can create different RDD depends on user needs.
@@ -732,6 +737,23 @@ object CarbonDataRDDFactory {
}
+ def loadDataForPartitionTable(): Unit = {
+ try {
+ val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel)
+ status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ rdd).collect()
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex, "load data failed for partition table")
+ throw ex
+ }
+ }
+
if (!updateModel.isDefined) {
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, currentLoadCount.toString)
@@ -742,10 +764,11 @@ object CarbonDataRDDFactory {
try {
if (updateModel.isDefined) {
loadDataFrameForUpdate()
+ } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+ loadDataForPartitionTable()
} else if (dataFrame.isDefined) {
loadDataFrame()
- }
- else {
+ } else {
loadDataFile()
}
if (updateModel.isDefined) {
@@ -931,6 +954,84 @@ object CarbonDataRDDFactory {
}
+ /**
+ * repartition the input data for partiton table.
+ * @param sqlContext
+ * @param dataFrame
+ * @param carbonLoadModel
+ * @return
+ */
+ private def repartitionInputData(sqlContext: SQLContext,
+ dataFrame: Option[DataFrame],
+ carbonLoadModel: CarbonLoadModel): RDD[Row] = {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName
+ val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
+ val columns = carbonLoadModel.getCsvHeaderColumns
+ var partitionColumnIndex = -1
+ for (i <- 0 until columns.length) {
+ if (partitionColumn.equals(columns(i))) {
+ partitionColumnIndex = i
+ }
+ }
+ if (partitionColumnIndex == -1) {
+ throw new DataLoadingException("Partition column not found.")
+ }
+ // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
+ val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
+ // input data from DataFrame
+ val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+ .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ val timeStampFormat = new SimpleDateFormat(timestampFormatString)
+ val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+ .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+ val dateFormat = new SimpleDateFormat(dateFormatString)
+ val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ val serializationNullFormat =
+ carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+ dataFrame.get.rdd.map { row =>
+ (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
+ delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+ }
+ } else {
+ // input data from csv files
+ val hadoopConfiguration = new Configuration()
+ CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel)
+ hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
+ val columnCount = columns.length
+ new NewHadoopRDD[NullWritable, StringArrayWritable](
+ sqlContext.sparkContext,
+ classOf[CSVInputFormat],
+ classOf[NullWritable],
+ classOf[StringArrayWritable],
+ hadoopConfiguration
+ ).map { currentRow =>
+ val row = new StringArrayRow(new Array[String](columnCount))
+ (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+ }
+ }
+
+ val partitioner = PartitionFactory.getPartitioner(partitionInfo)
+ if (partitionColumnDataType == DataType.STRING) {
+ if (partitionInfo.getPartitionType == PartitionType.RANGE) {
+ inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) }
+ .partitionBy(partitioner)
+ .map(_._2)
+ } else {
+ inputRDD.partitionBy(partitioner)
+ .map(_._2)
+ }
+ } else {
+ inputRDD.map { row =>
+ (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType), row._2)
+ }
+ .partitionBy(partitioner)
+ .map(_._2)
+ }
+ }
+
private def shutDownDictionaryServer(carbonLoadModel: CarbonLoadModel,
result: Future[DictionaryServer], writeDictionary: Boolean = true): Unit = {
// write dictionary file and shutdown dictionary server
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c861a68/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4656c2e..a8f2994 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -17,8 +17,8 @@
package org.apache.carbondata.spark.rdd
+import java.text.SimpleDateFormat
import java.util
-import java.util.UUID
import java.util.concurrent._
import scala.collection.JavaConverters._
@@ -26,12 +26,13 @@ import scala.collection.mutable.ListBuffer
import scala.util.Random
import scala.util.control.Breaks._
-import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, UpdateCoalescedRDD}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
import org.apache.spark.sql.hive.DistributionUtil
@@ -43,20 +44,22 @@ import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvload.BlockDetails
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
+import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
-import org.apache.carbondata.spark._
+import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory}
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
/**
* This is the factory class which can create different RDD depends on user needs.
@@ -653,6 +656,23 @@ object CarbonDataRDDFactory {
}
}
+ def loadDataForPartitionTable(): Unit = {
+ try {
+ val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel)
+ status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ rdd).collect()
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex, "load data failed for partition table")
+ throw ex
+ }
+ }
+
if (!updateModel.isDefined) {
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, currentLoadCount.toString)
@@ -661,11 +681,15 @@ object CarbonDataRDDFactory {
var errorMessage: String = "DataLoad failure"
var executorMessage: String = ""
try {
- if (dataFrame.isDefined) {
- loadDataFrame()
- }
- else {
- loadDataFile()
+ if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+ loadDataForPartitionTable()
+ } else {
+ if (dataFrame.isDefined) {
+ loadDataFrame()
+ }
+ else {
+ loadDataFile()
+ }
}
if (updateModel.isDefined) {
@@ -862,6 +886,84 @@ object CarbonDataRDDFactory {
}
+ /**
+ * repartition the input data for partiton table.
+ * @param sqlContext
+ * @param dataFrame
+ * @param carbonLoadModel
+ * @return
+ */
+ private def repartitionInputData(sqlContext: SQLContext,
+ dataFrame: Option[DataFrame],
+ carbonLoadModel: CarbonLoadModel): RDD[Row] = {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName
+ val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
+ val columns = carbonLoadModel.getCsvHeaderColumns
+ var partitionColumnIndex = -1
+ for (i <- 0 until columns.length) {
+ if (partitionColumn.equals(columns(i))) {
+ partitionColumnIndex = i
+ }
+ }
+ if (partitionColumnIndex == -1) {
+ throw new DataLoadingException("Partition column not found.")
+ }
+ // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
+ val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
+ // input data from DataFrame
+ val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+ .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ val timeStampFormat = new SimpleDateFormat(timestampFormatString)
+ val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+ .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+ val dateFormat = new SimpleDateFormat(dateFormatString)
+ val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ val serializationNullFormat =
+ carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+ dataFrame.get.rdd.map { row =>
+ (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
+ delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+ }
+ } else {
+ // input data from csv files
+ val hadoopConfiguration = new Configuration()
+ CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel)
+ hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
+ val columnCount = columns.length
+ new NewHadoopRDD[NullWritable, StringArrayWritable](
+ sqlContext.sparkContext,
+ classOf[CSVInputFormat],
+ classOf[NullWritable],
+ classOf[StringArrayWritable],
+ hadoopConfiguration
+ ).map { currentRow =>
+ val row = new StringArrayRow(new Array[String](columnCount))
+ (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+ }
+ }
+
+ val partitioner = PartitionFactory.getPartitioner(partitionInfo)
+ if (partitionColumnDataType == DataType.STRING) {
+ if (partitionInfo.getPartitionType == PartitionType.RANGE) {
+ inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) }
+ .partitionBy(partitioner)
+ .map(_._2)
+ } else {
+ inputRDD.partitionBy(partitioner)
+ .map(_._2)
+ }
+ } else {
+ inputRDD.map { row =>
+ (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType), row._2)
+ }
+ .partitionBy(partitioner)
+ .map(_._2)
+ }
+ }
+
private def shutdownDictionaryServer(carbonLoadModel: CarbonLoadModel,
result: Future[DictionaryServer], writeDictionary: Boolean = true) = {
// write dictionary file and shutdown dictionary server
[2/2] carbondata git commit: [CARBONDATA-937] Data loading for
partition table(12-dev) This closes #842
Posted by ja...@apache.org.
[CARBONDATA-937] Data loading for partition table(12-dev) This closes #842
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2119fe89
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2119fe89
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2119fe89
Branch: refs/heads/12-dev
Commit: 2119fe89152a7f62c608685a5500700b8c7d371f
Parents: 1832927 9c861a6
Author: jackylk <ja...@huawei.com>
Authored: Fri May 12 15:16:03 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri May 12 15:16:03 2017 +0800
----------------------------------------------------------------------
.../core/scan/partition/HashPartitioner.java | 41 +++
.../core/scan/partition/ListPartitioner.java | 65 +++++
.../core/scan/partition/PartitionUtil.java | 109 +++++++
.../core/scan/partition/Partitioner.java | 27 ++
.../core/scan/partition/RangePartitioner.java | 137 +++++++++
.../carbondata/hadoop/CarbonInputSplit.java | 6 +-
.../TestDataLoadingForPartitionTable.scala | 289 +++++++++++++++++++
.../carbondata/spark/PartitionFactory.scala | 65 +++++
.../spark/rdd/NewCarbonDataLoadRDD.scala | 79 +++++
.../spark/rdd/CarbonDataRDDFactory.scala | 113 +++++++-
.../spark/rdd/CarbonDataRDDFactory.scala | 128 +++++++-
11 files changed, 1038 insertions(+), 21 deletions(-)
----------------------------------------------------------------------