You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/12/24 14:13:03 UTC
[1/3] incubator-carbondata git commit: extract command to common
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 956eb3348 -> 4a8e15d34
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/test/resources/data_alltypes.csv
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/resources/data_alltypes.csv b/integration/spark2/src/test/resources/data_alltypes.csv
new file mode 100644
index 0000000..2722edd
--- /dev/null
+++ b/integration/spark2/src/test/resources/data_alltypes.csv
@@ -0,0 +1,10 @@
+1,10,100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23 11:01:01,aaa
+5,17,140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27 11:01:02,bbb
+1,11,100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23 11:01:03,ccc
+1,10,150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24 11:01:04,ddd
+1,10,100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23 11:01:05,eeee
+3,14,160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26 11:01:06,ff
+2,10,100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23 11:01:07,ggg
+1,10,100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23 11:01:08,hhh
+4,16,130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23 11:01:09,iii
+1,10,100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23 11:01:10,jjj
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index c8d5221..b7617e8 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -21,10 +21,10 @@ package org.apache.carbondata.spark.testsuite.allqueries
import java.io.File
-import org.apache.spark.sql.{Row, SaveMode}
-import org.apache.spark.sql.common.util.CarbonSessionTest._
+import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.util.CarbonProperties
class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
- clean
+ dropAllTable
val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../../spark")
.getCanonicalPath
@@ -57,14 +57,14 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
sql("INSERT INTO table Carbon_automation_test select * from Carbon_automation_test_hive");
}
- def clean{
+ def dropAllTable{
sql("drop table if exists Carbon_automation_test")
sql("drop table if exists Carbon_automation_hive")
sql("drop table if exists Carbon_automation_test_hive")
}
override def afterAll {
- clean
+ dropAllTable
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index bab78b9..057d894 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -17,8 +17,7 @@
package org.apache.spark.carbondata
-import org.apache.spark.carbondata.util.QueryTest
-import org.apache.spark.carbondata.util.QueryTest._
+import org.apache.spark.sql.common.util.QueryTest
import org.scalatest.BeforeAndAfterAll
class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
deleted file mode 100644
index 5a2a27e..0000000
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.carbondata.util
-
-import java.io.File
-
-import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.scalatest.FunSuite
-
-import org.apache.carbondata.core.util.CarbonProperties
-
-
-object QueryTest {
-
- val (spark: SparkSession, storeLocation: String, warehouse: String, metastoredb: String) = {
-
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
- val storeLocation = s"$rootPath/integration/spark2/target/store"
- val warehouse = s"$rootPath/integration/spark2/target/warehouse"
- val metastoredb = s"$rootPath/integration/spark2/target/metastore_db"
-
- val spark = SparkSession
- .builder()
- .master("local")
- .appName("Spark2Testcases")
- .enableHiveSupport()
- .config("spark.sql.warehouse.dir", warehouse)
- .config("javax.jdo.option.ConnectionURL",
- s"jdbc:derby:;databaseName=$metastoredb;create=true")
- .getOrCreate()
- spark.sparkContext.setLogLevel("ERROR")
- CarbonProperties.getInstance()
- .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
- .addProperty("carbon.storelocation", storeLocation)
-
- CarbonEnv.init(spark.sqlContext)
- CarbonEnv.get.carbonMetastore.cleanStore()
- (spark, storeLocation, warehouse, metastoredb)
- }
-
- def clean: Unit = {
- val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
- clean(storeLocation)
- }
-
-}
-
-class QueryTest extends FunSuite
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
index ec241ae..6cce86a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
@@ -19,11 +19,7 @@
package org.apache.spark.carbondata.vectorreader
-import java.io.File
-
-import org.apache.spark.carbondata.util.QueryTest
-import org.apache.spark.carbondata.util.QueryTest._
-import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.common.util.QueryTest
import org.apache.spark.sql.execution.{BatchedDataSourceScanExec, RowDataSourceScanExec}
import org.scalatest.BeforeAndAfterAll
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
deleted file mode 100644
index d29196e..0000000
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.common.util
-
-import java.io.File
-
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.commons.io.FileUtils
-
-object CarbonSessionTest extends{
-
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
- val storeLocation = s"$rootPath/examples/spark2/target/store"
- val warehouse = s"$rootPath/examples/spark2/target/warehouse"
- val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
-
- val spark = {
-
- // clean data folder
- if (true) {
- val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
- clean(storeLocation)
- clean(warehouse)
- clean(metastoredb)
- }
-
- val spark = SparkSession
- .builder()
- .master("local")
- .appName("CarbonExample")
- .enableHiveSupport()
- .config("spark.sql.warehouse.dir", warehouse)
- .config("javax.jdo.option.ConnectionURL",
- s"jdbc:derby:;databaseName=$metastoredb;create=true")
- .getOrCreate()
-
- CarbonProperties.getInstance()
- .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
- .addProperty("carbon.storelocation", storeLocation)
-
- spark.sparkContext.setLogLevel("WARN")
-
- spark
- }
-
- val sc = spark.sparkContext
-
- lazy val implicits = spark.implicits
-
- def sql(sqlText: String): DataFrame = spark.sql(sqlText)
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 44d3bfa..45dcb03 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -17,14 +17,18 @@
package org.apache.spark.sql.common.util
+import java.io.File
import java.util.{Locale, TimeZone}
-import org.apache.carbondata.common.logging.LogServiceFactory
-
import scala.collection.JavaConversions._
+
+import org.apache.commons.io.FileUtils
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.CarbonProperties
class QueryTest extends PlanTest {
@@ -35,6 +39,50 @@ class QueryTest extends PlanTest {
// Add Locale setting
Locale.setDefault(Locale.US)
+
+ val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+ val spark = {
+ // clean data folder
+ if (true) {
+ val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+ clean(storeLocation)
+ clean(warehouse)
+ clean(metastoredb)
+ }
+
+ val spark = SparkSession
+ .builder()
+ .master("local")
+ .appName("CarbonExample")
+ .enableHiveSupport()
+ .config("spark.sql.warehouse.dir", warehouse)
+ .config("javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$metastoredb;create=true")
+ .getOrCreate()
+
+ CarbonProperties.getInstance()
+ .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+ .addProperty("carbon.storelocation", storeLocation)
+
+ spark.sparkContext.setLogLevel("WARN")
+ spark
+ }
+
+ val sc = spark.sparkContext
+
+ lazy val implicits = spark.implicits
+
+ def sql(sqlText: String): DataFrame = spark.sql(sqlText)
+
+ def clean: Unit = {
+ val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+ clean(storeLocation)
+ }
+
/**
* Runs the plan and makes sure the answer contains all of the keywords, or the
* none of keywords are listed in the answer
@@ -79,6 +127,62 @@ class QueryTest extends PlanTest {
protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = {
checkAnswer(df, expectedAnswer.collect())
}
+
+
+ protected def createAndLoadInputTable(inputTableName: String, inputPath: String): Unit = {
+ sql(
+ s"""
+ | CREATE TABLE $inputTableName
+ | ( shortField short,
+ | intField int,
+ | bigintField long,
+ | doubleField double,
+ | stringField string,
+ | timestampField string,
+ | decimalField decimal(18,2),
+ | dateField string,
+ | charField char(5)
+ | )
+ | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ """.stripMargin)
+
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$inputPath'
+ | INTO TABLE $inputTableName
+ """.stripMargin)
+ }
+
+ protected def createAndLoadTestTable(tableName: String, inputTableName: String): Unit = {
+ sql(
+ s"""
+ | CREATE TABLE $tableName(
+ | shortField short,
+ | intField int,
+ | bigintField long,
+ | doubleField double,
+ | stringField string,
+ | timestampField timestamp,
+ | decimalField decimal(18,2),
+ | dateField date,
+ | charField char(5)
+ | )
+ | USING org.apache.spark.sql.CarbonSource
+ | OPTIONS ('tableName' '$tableName')
+ """.stripMargin)
+ sql(
+ s"""
+ | INSERT INTO TABLE $tableName
+ | SELECT shortField, intField, bigintField, doubleField, stringField,
+ | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField,
+ | cast(to_date(from_unixtime(unix_timestamp(dateField,'yyyy/M/dd'))) as date), charField
+ | FROM $inputTableName
+ """.stripMargin)
+ }
+
+ protected def dropTable(tableName: String): Unit ={
+ sql(s"DROP TABLE IF EXISTS $tableName")
+ }
}
object QueryTest {
@@ -146,4 +250,5 @@ object QueryTest {
return None
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
new file mode 100644
index 0000000..f80b12d
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.File
+import java.sql.Timestamp
+import java.util.Date
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.util.CarbonUtil
+
+class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../../spark2")
+ .getCanonicalPath
+ dropTable("csv_table")
+ createAndLoadInputTable("csv_table", s"$currentDirectory/src/test/resources/data_alltypes.csv")
+ createAndLoadTestTable("carbon_table", "csv_table")
+ }
+
+ override def afterAll(): Unit = {
+ dropTable("csv_table")
+ dropTable("carbon_table")
+ }
+
+ test("show segment") {
+ ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", "carbon_table"))
+ }
+
+ test("delete segment by id") {
+ DeleteSegmentById.main(Array(s"${CarbonUtil.getCarbonStorePath}", "carbon_table", "0"))
+ assert(!CarbonStore.isSegmentValid("default", "carbon_table", "0"))
+ }
+
+ test("delete segment by date") {
+ createAndLoadTestTable("carbon_table2", "csv_table")
+ val time = new Timestamp(new Date().getTime)
+ DeleteSegmentByDate.main(Array(s"${CarbonUtil.getCarbonStorePath}", "carbon_table2", time.toString))
+ assert(!CarbonStore.isSegmentValid("default", "carbon_table2", "0"))
+ dropTable("carbon_table2")
+ }
+
+ test("clean files") {
+ val table = "carbon_table3"
+ createAndLoadTestTable(table, "csv_table")
+ ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
+ DeleteSegmentById.main(Array(s"${CarbonUtil.getCarbonStorePath}", table, "0"))
+ ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
+ CleanFiles.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
+ ShowSegments.main(Array(s"${CarbonUtil.getCarbonStorePath}", table))
+ val tablePath = s"${CarbonUtil.getCarbonStorePath}${File.separator}default${File.separator}$table"
+ val f = new File(s"$tablePath/Fact/Part0")
+ assert(f.isDirectory)
+
+ // all segment folders should be deleted after CleanFiles command
+ assert(f.list().length == 0)
+ dropTable(table)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
index a3bc3a3..030ee0b 100644
--- a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
@@ -589,5 +589,9 @@ public class SegmentStatusManager {
public List<String> getInvalidSegments() {
return invalidSegments;
}
+
+ public boolean isValid(String segment) {
+ return validSegments.contains(segment);
+ }
}
}
[2/3] incubator-carbondata git commit: extract command to common
Posted by ch...@apache.org.
extract command to common
fix
fix testcase
remove redundant QueryTest
fix comment
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b50866b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b50866b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b50866b3
Branch: refs/heads/master
Commit: b50866b3a2e3eb7d704c6fa9f6e6fcdfc5e5de21
Parents: 956eb33
Author: jackylk <ja...@huawei.com>
Authored: Thu Dec 22 15:09:53 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Sat Dec 24 22:10:22 2016 +0800
----------------------------------------------------------------------
.../core/carbon/AbsoluteTableIdentifier.java | 6 +
.../core/carbon/path/CarbonStorePath.java | 6 +-
.../core/carbon/path/CarbonTablePath.java | 35 +-
.../carbondata/examples/CarbonExample.scala | 1 +
.../carbondata/spark/load/CarbonLoaderUtil.java | 19 +-
.../spark/load/DeleteLoadFolders.java | 37 +-
.../carbondata/spark/util/LoadMetadataUtil.java | 7 +-
.../org/apache/carbondata/api/CarbonStore.scala | 176 ++++++++++
.../spark/rdd/DataManagementFunc.scala | 163 ++-------
.../spark/rdd/CarbonDataRDDFactory.scala | 4 +-
.../execution/command/carbonTableSchema.scala | 349 ++++---------------
.../apache/spark/sql/hive/CarbonMetastore.scala | 6 +-
.../spark/load/CarbonLoaderUtilTest.java | 1 -
.../spark/rdd/CarbonDataRDDFactory.scala | 4 +-
.../execution/command/carbonTableSchema.scala | 275 +--------------
.../apache/spark/sql/hive/CarbonMetastore.scala | 10 +-
.../org/apache/spark/util/CleanFiles.scala | 15 +-
.../org/apache/spark/util/Compaction.scala | 11 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 14 +-
.../apache/spark/util/DeleteSegmentById.scala | 14 +-
.../org/apache/spark/util/ShowSegments.scala | 38 +-
.../org/apache/spark/util/TableAPIUtil.scala | 23 +-
.../spark2/src/test/resources/data_alltypes.csv | 10 +
.../AllDataTypesTestCaseAggregate.scala | 10 +-
.../carbondata/CarbonDataSourceSuite.scala | 3 +-
.../spark/carbondata/util/QueryTest.scala | 66 ----
.../vectorreader/VectorReaderTestCase.scala | 6 +-
.../sql/common/util/CarbonSessionTest.scala | 74 ----
.../spark/sql/common/util/QueryTest.scala | 111 +++++-
.../apache/spark/util/CarbonCommandSuite.scala | 78 +++++
.../lcm/status/SegmentStatusManager.java | 4 +
31 files changed, 608 insertions(+), 968 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
index cf7d92f..1424ba5 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
/**
* identifier which will have store path and carbon table identifier
@@ -65,6 +66,11 @@ public class AbsoluteTableIdentifier implements Serializable {
return carbonTableIdentifier;
}
+ public static AbsoluteTableIdentifier from(String dbName, String tableName) {
+ CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, "");
+ return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), identifier);
+ }
+
public static AbsoluteTableIdentifier fromTablePath(String tablePath) {
String formattedTablePath = tablePath.replace('\\', '/');
String[] names = formattedTablePath.split("/");
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java
index 214c633..afe4d9a 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonStorePath.java
@@ -45,10 +45,14 @@ public class CarbonStorePath extends Path {
CarbonTablePath carbonTablePath = new CarbonTablePath(tableIdentifier,
storePath + File.separator + tableIdentifier.getDatabaseName() + File.separator
+ tableIdentifier.getTableName());
-
return carbonTablePath;
}
+ public static CarbonTablePath getCarbonTablePath(String storePath,
+ String dbName, String tableName) {
+ return new CarbonTablePath(storePath, dbName, tableName);
+ }
+
public static CarbonTablePath getCarbonTablePath(AbsoluteTableIdentifier identifier) {
CarbonTableIdentifier id = identifier.getCarbonTableIdentifier();
return new CarbonTablePath(id, identifier.getTablePath());
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
index f90073e..54e7266 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
@@ -63,6 +63,12 @@ public class CarbonTablePath extends Path {
this.tablePath = tablePathString;
}
+ public CarbonTablePath(String storePath, String dbName, String tableName) {
+ super(storePath + File.separator + dbName + File.separator + tableName);
+ this.carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "");
+ this.tablePath = storePath + File.separator + dbName + File.separator + tableName;
+ }
+
/**
* The method returns the folder path containing the carbon file.
*
@@ -249,22 +255,6 @@ public class CarbonTablePath extends Path {
}
/**
- * Gets absolute path of data file of given aggregate table
- *
- * @param aggTableID unique aggregate table identifier
- * @param partitionId unique partition identifier
- * @param segmentId unique partition identifier
- * @param filePartNo data file part number
- * @param factUpdateTimeStamp unique identifier to identify an update
- * @return absolute path of data file stored in carbon data format
- */
- public String getCarbonAggDataFilePath(String aggTableID, String partitionId, String segmentId,
- Integer filePartNo, Integer taskNo, String factUpdateTimeStamp) {
- return getAggSegmentDir(aggTableID, partitionId, segmentId) + File.separator
- + getCarbonDataFileName(filePartNo, taskNo, factUpdateTimeStamp);
- }
-
- /**
* Gets data file name only with out path
*
* @param filePartNo data file part number
@@ -297,15 +287,6 @@ public class CarbonTablePath extends Path {
return getFactDir() + File.separator + PARTITION_PREFIX + partitionId;
}
- private String getAggSegmentDir(String aggTableID, String partitionId, String segmentId) {
- return getAggPartitionDir(aggTableID, partitionId) + File.separator + SEGMENT_PREFIX
- + segmentId;
- }
-
- private String getAggPartitionDir(String aggTableID, String partitionId) {
- return getAggregateTableDir(aggTableID) + File.separator + PARTITION_PREFIX + partitionId;
- }
-
private String getMetaDataDir() {
return tablePath + File.separator + METADATA_DIR;
}
@@ -314,10 +295,6 @@ public class CarbonTablePath extends Path {
return tablePath + File.separator + FACT_DIR;
}
- private String getAggregateTableDir(String aggTableId) {
- return tablePath + File.separator + AGGREGATE_TABLE_PREFIX + aggTableId;
- }
-
@Override public boolean equals(Object o) {
if (!(o instanceof CarbonTablePath)) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index c2e135a..273de95 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -21,6 +21,7 @@ import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{CleanFiles, ShowSegments}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 492ceee..0d2ab6f 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -47,7 +47,6 @@ import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
import org.apache.carbondata.core.carbon.ColumnIdentifier;
import org.apache.carbondata.core.carbon.datastore.block.Distributable;
@@ -86,7 +85,6 @@ import com.google.gson.Gson;
import org.apache.spark.SparkConf;
import org.apache.spark.util.Utils;
-
public final class CarbonLoaderUtil {
private static final LogService LOGGER =
@@ -449,11 +447,10 @@ public final class CarbonLoaderUtil {
return status;
}
- public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
- String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(schema.getCarbonTable().getStorePath(),
- schema.getCarbonTable().getCarbonTableIdentifier());
+ public static void writeLoadMetadata(String storeLocation, String dbName, String tableName,
+ List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+ CarbonTablePath carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(storeLocation, dbName, tableName);
String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
DataOutputStream dataOutputStream;
@@ -464,11 +461,9 @@ public final class CarbonLoaderUtil {
new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
try {
-
dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
brWriter.write(metadataInstance);
} finally {
@@ -478,12 +473,10 @@ public final class CarbonLoaderUtil {
}
} catch (Exception e) {
LOGGER.error("error in flushing ");
-
}
CarbonUtil.closeStreams(brWriter);
writeOperation.close();
}
-
}
public static String readCurrentTime() {
@@ -495,10 +488,10 @@ public final class CarbonLoaderUtil {
return date;
}
- public static String extractLoadMetadataFileLocation(CarbonLoadModel loadModel) {
+ public static String extractLoadMetadataFileLocation(String dbName, String tableName) {
CarbonTable carbonTable =
org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
+ .getCarbonTable(dbName + '_' + tableName);
return carbonTable.getMetaDataFilepath();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
index 2b3979f..0663abc 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -37,6 +37,7 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
import org.apache.carbondata.core.carbon.path.CarbonStorePath;
import org.apache.carbondata.core.carbon.path.CarbonTablePath;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -45,7 +46,6 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
import org.apache.carbondata.core.load.LoadMetadataDetails;
import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
public final class DeleteLoadFolders {
@@ -58,23 +58,13 @@ public final class DeleteLoadFolders {
/**
* returns segment path
- *
- * @param loadModel
- * @param storeLocation
- * @param partitionId
- * @param oneLoad
- * @return
*/
- private static String getSegmentPath(CarbonLoadModel loadModel, String storeLocation,
+ private static String getSegmentPath(String dbName, String tableName, String storeLocation,
int partitionId, LoadMetadataDetails oneLoad) {
-
- String path = null;
+ CarbonTablePath carbon = new CarbonStorePath(storeLocation).getCarbonTablePath(
+ new CarbonTableIdentifier(dbName, tableName, ""));
String segmentId = oneLoad.getLoadName();
-
- path = new CarbonStorePath(storeLocation).getCarbonTablePath(
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())
- .getCarbonDataDirectoryPath("" + partitionId, segmentId);
- return path;
+ return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
}
private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
@@ -221,32 +211,21 @@ public final class DeleteLoadFolders {
}
}
- /**
- * @param loadModel
- * @param storeLocation
- * @param isForceDelete
- * @param details
- * @return
- *
- */
- public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
+ public static boolean deleteLoadFoldersFromFileSystem(String dbName, String tableName,
String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
List<LoadMetadataDetails> deletedLoads =
new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
boolean isDeleted = false;
-
if (details != null && details.length != 0) {
for (LoadMetadataDetails oneLoad : details) {
if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
- String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad);
+ String path = getSegmentPath(dbName, tableName, storeLocation, 0, oneLoad);
boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
if (deletionStatus) {
isDeleted = true;
oneLoad.setVisibility("false");
deletedLoads.add(oneLoad);
- LOGGER.info("Info: " +
- " Deleted the load " + oneLoad.getLoadName());
+ LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
index 11cf9f8..1c21fab 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -28,20 +28,19 @@
*/
package org.apache.carbondata.spark.util;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.load.LoadMetadataDetails;
import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
public final class LoadMetadataUtil {
private LoadMetadataUtil() {
}
- public static boolean isLoadDeletionRequired(CarbonLoadModel loadModel) {
- CarbonTable table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
+ public static boolean isLoadDeletionRequired(String dbName, String tableName) {
+ CarbonTable table = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName);
String metaDataLocation = table.getMetaDataFilepath();
LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
new file mode 100644
index 0000000..6fb2df1
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.api
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.types.TimestampType
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.rdd.DataManagementFunc
+
+object CarbonStore {
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def showSegments(
+ dbName: String,
+ tableName: String,
+ limit: Option[String]): Seq[Row] = {
+ val tableUniqueName = dbName + "_" + tableName
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ val path = carbonTable.getMetaDataFilepath
+ val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
+ if (loadMetadataDetailsArray.nonEmpty) {
+ val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
+ var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith { (l1, l2) =>
+ java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double.parseDouble(l2.getLoadName)
+ }
+ if (limit.isDefined) {
+ loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
+ .filter(load => load.getVisibility.equalsIgnoreCase("true"))
+ val limitLoads = limit.get
+ try {
+ val lim = Integer.parseInt(limitLoads)
+ loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
+ } catch {
+ case _: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
+ }
+ }
+
+ loadMetadataDetailsSortedArray
+ .filter(_.getVisibility.equalsIgnoreCase("true"))
+ .map { load =>
+ Row(
+ load.getLoadName,
+ load.getLoadStatus,
+ new java.sql.Timestamp(parser.parse(load.getLoadStartTime).getTime),
+ new java.sql.Timestamp(parser.parse(load.getTimestamp).getTime)
+ )
+ }.toSeq
+ } else {
+ Seq.empty
+ }
+ }
+
+ def cleanFiles(
+ dbName: String,
+ tableName: String,
+ storePath: String): Unit = {
+ LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
+ try {
+ DataManagementFunc.cleanFiles(dbName, tableName, storePath)
+ LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.")
+ } catch {
+ case ex: Exception =>
+ sys.error(ex.getMessage)
+ }
+ Seq.empty
+ }
+
+ // validates load ids
+ private def validateLoadIds(loadids: Seq[String]): Unit = {
+ if (loadids.isEmpty) {
+ val errorMessage = "Error: Segment id(s) should not be empty."
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+ }
+
+ def deleteLoadById(
+ loadids: Seq[String],
+ dbName: String,
+ tableName: String): Unit = {
+
+ LOGGER.audit(s"Delete segment by Id request has been received for $dbName.$tableName")
+ validateLoadIds(loadids)
+
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
+ val path = carbonTable.getMetaDataFilepath
+
+ try {
+ val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
+ carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
+ if (invalidLoadIds.isEmpty) {
+ LOGGER.audit(s"Delete segment by Id is successfull for $dbName.$tableName.")
+ } else {
+ sys.error(s"Delete segment by Id is failed. Invalid ID is: ${invalidLoadIds.mkString(",")}")
+ }
+ } catch {
+ case ex: Exception =>
+ sys.error(ex.getMessage)
+ }
+ Seq.empty
+ }
+
+ def deleteLoadByDate(
+ timestamp: String,
+ dbName: String,
+ tableName: String): Unit = {
+ LOGGER.audit(s"Delete segment by Id request has been received for $dbName.$tableName")
+
+ val time = validateTimeFormat(timestamp)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
+ val path = carbonTable.getMetaDataFilepath
+
+ try {
+ val invalidLoadTimestamps =
+ SegmentStatusManager.updateDeletionStatus(
+ carbonTable.getAbsoluteTableIdentifier,
+ timestamp,
+ path,
+ time).asScala
+ if (invalidLoadTimestamps.isEmpty) {
+ LOGGER.audit(s"Delete segment by date is successful for $dbName.$tableName.")
+ } else {
+ sys.error("Delete segment by date is failed. No matching segment found.")
+ }
+ } catch {
+ case ex: Exception =>
+ sys.error(ex.getMessage)
+ }
+ }
+
+ // this function is for test only
+ def isSegmentValid(
+ dbName: String,
+ tableName: String,
+ segmentId: String): Boolean = {
+ val identifier = AbsoluteTableIdentifier.from(dbName, tableName)
+ val status = SegmentStatusManager.getSegmentStatus(identifier)
+ status.isValid(segmentId)
+ }
+
+ private def validateTimeFormat(timestamp: String): Long = {
+ val timeObj = Cast(Literal(timestamp), TimestampType).eval()
+ if (null == timeObj) {
+ val errorMessage = "Error: Invalid load start time format: " + timestamp
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+ timeObj.asInstanceOf[Long]
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 28a9140..c2f06a4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -21,22 +21,18 @@ import java.util
import java.util.concurrent._
import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.LoadMetadataDetails
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark._
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionCallable, CompactionType}
import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
@@ -48,104 +44,6 @@ object DataManagementFunc {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- def deleteLoadByDate(
- sqlContext: SQLContext,
- schema: CarbonDataLoadSchema,
- databaseName: String,
- tableName: String,
- storePath: String,
- dateField: String,
- dateFieldActualName: String,
- dateValue: String) {
-
- val sc = sqlContext
- // Delete the records based on data
- val table = CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)
- val loadMetadataDetailsArray =
- SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
- val resultMap = new CarbonDeleteLoadByDateRDD(
- sc.sparkContext,
- new DeletedLoadResultImpl(),
- databaseName,
- table.getDatabaseName,
- dateField,
- dateFieldActualName,
- dateValue,
- table.getFactTableName,
- tableName,
- storePath,
- loadMetadataDetailsArray).collect.groupBy(_._1)
-
- var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]()
- if (resultMap.nonEmpty) {
- if (resultMap.size == 1) {
- if (resultMap.contains("")) {
- LOGGER.error("Delete by Date request is failed")
- sys.error("Delete by Date request is failed, potential causes " +
- "Empty store or Invalid column type, For more details please refer logs.")
- }
- }
- val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => {
- var statusList = resultMap.get(elem.getLoadName)
- // check for the merged load folder.
- if (statusList.isEmpty && null != elem.getMergedLoadName) {
- statusList = resultMap.get(elem.getMergedLoadName)
- }
-
- if (statusList.isDefined) {
- elem.setModificationOrdeletionTimesStamp(CarbonLoaderUtil.readCurrentTime())
- // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist,
- // use MARKED_FOR_UPDATE
- if (statusList.get
- .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) {
- elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
- } else {
- elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE)
- updatedLoadMetadataDetailsList += elem
- }
- elem
- } else {
- elem
- }
- }
-
- }
-
- // Save the load metadata
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK
- )
- try {
- if (carbonLock.lockWithRetries()) {
- LOGGER.info("Successfully got the table metadata file lock")
- if (updatedLoadMetadataDetailsList.nonEmpty) {
- // TODO: Load Aggregate tables after retention.
- }
-
- // write
- CarbonLoaderUtil.writeLoadMetadata(
- schema,
- databaseName,
- table.getDatabaseName,
- updatedloadMetadataDetails.asJava
- )
- }
- } finally {
- if (carbonLock.unlock()) {
- LOGGER.info("unlock the table metadata file successfully")
- } else {
- LOGGER.error("Unable to unlock the metadata lock")
- }
- }
- } else {
- LOGGER.error("Delete by Date request is failed")
- LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName")
- sys.error("Delete by Date request is failed, potential causes " +
- "Empty store or Invalid column type, For more details please refer logs.")
- }
- }
-
def executeCompaction(carbonLoadModel: CarbonLoadModel,
storePath: String,
compactionModel: CompactionModel,
@@ -224,7 +122,7 @@ object DataManagementFunc {
*
* @param futureList
*/
- def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
+ private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
loadsToMerge: util
.List[LoadMetadataDetails],
executor: ExecutorService,
@@ -289,21 +187,29 @@ object DataManagementFunc {
}
def deleteLoadsAndUpdateMetadata(
- carbonLoadModel: CarbonLoadModel,
- table: CarbonTable,
+ dbName: String,
+ tableName: String,
storePath: String,
isForceDeletion: Boolean): Unit = {
- if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
- val loadMetadataFilePath = CarbonLoaderUtil
- .extractLoadMetadataFileLocation(carbonLoadModel)
+ if (LoadMetadataUtil.isLoadDeletionRequired(dbName, tableName)) {
+ val loadMetadataFilePath =
+ CarbonLoaderUtil.extractLoadMetadataFileLocation(dbName, tableName)
val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
- val carbonTableStatusLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.TABLE_STATUS_LOCK)
+ val carbonTableStatusLock =
+ CarbonLockFactory.getCarbonLockObj(
+ new CarbonTableIdentifier(dbName, tableName, ""),
+ LockUsage.TABLE_STATUS_LOCK
+ )
// Delete marked loads
- val isUpdationRequired = DeleteLoadFolders
- .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, isForceDeletion, details)
+ val isUpdationRequired =
+ DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
+ dbName,
+ tableName,
+ storePath,
+ isForceDeletion,
+ details
+ )
if (isUpdationRequired) {
try {
@@ -318,14 +224,10 @@ object DataManagementFunc {
val latestStatus = CarbonLoaderUtil
.updateLoadMetadataFromOldToNew(details, latestMetadata)
- CarbonLoaderUtil.writeLoadMetadata(
- carbonLoadModel.getCarbonDataLoadSchema,
- carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, latestStatus)
+ CarbonLoaderUtil.writeLoadMetadata(storePath, dbName, tableName, latestStatus)
} else {
val errorMsg = "Clean files request is failed for " +
- s"${ carbonLoadModel.getDatabaseName }." +
- s"${ carbonLoadModel.getTableName }" +
+ s"$dbName.$tableName" +
". Not able to acquire the table status lock due to other operation " +
"running in the background."
LOGGER.audit(errorMsg)
@@ -340,29 +242,24 @@ object DataManagementFunc {
}
def cleanFiles(
- sc: SparkContext,
- carbonLoadModel: CarbonLoadModel,
- storePath: String) {
- val table = CarbonMetadata.getInstance.getCarbonTable(
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
- val carbonCleanFilesLock = CarbonLockFactory.getCarbonLockObj(
- table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.CLEAN_FILES_LOCK)
+ dbName: String,
+ tableName: String,
+ storePath: String): Unit = {
+ val identifier = new CarbonTableIdentifier(dbName, tableName, "")
+ val carbonCleanFilesLock =
+ CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.CLEAN_FILES_LOCK)
try {
if (carbonCleanFilesLock.lockWithRetries()) {
LOGGER.info("Clean files lock has been successfully acquired.")
- deleteLoadsAndUpdateMetadata(carbonLoadModel,
- table,
- storePath,
- isForceDeletion = true)
+ deleteLoadsAndUpdateMetadata(dbName, tableName, storePath, isForceDeletion = true)
} else {
val errorMsg = "Clean files request is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+ s"$dbName.$tableName" +
". Not able to acquire the clean files lock due to another clean files " +
"operation is running in the background."
LOGGER.audit(errorMsg)
LOGGER.error(errorMsg)
throw new Exception(errorMsg + " Please try after some time.")
-
}
} finally {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 53a5f67..93194c8 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -433,8 +433,8 @@ object CarbonDataRDDFactory {
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
// Check if any load need to be deleted before loading new data
- DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
- isForceDeletion = false)
+ DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, storePath, isForceDeletion = false)
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 041b65f..d82290e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -18,14 +18,13 @@
package org.apache.spark.sql.execution.command
import java.io.File
-import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
import org.apache.spark.sql.hive.CarbonMetastore
@@ -33,9 +32,9 @@ import org.apache.spark.sql.types.TimestampType
import org.apache.spark.util.FileUtils
import org.codehaus.jackson.map.ObjectMapper
+import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
@@ -49,9 +48,23 @@ import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DataManagementFunc$, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
+object Checker {
+ def validateTableExists(
+ dbName: Option[String],
+ tableName: String,
+ sqlContext: SQLContext): Unit = {
+ val identifier = TableIdentifier(tableName, dbName)
+ if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(sqlContext)) {
+ val err = s"table $dbName.$tableName not found"
+ LogServiceFactory.getLogService(this.getClass.getName).error(err)
+ throw new IllegalArgumentException(err)
+ }
+ }
+}
+
/**
* Command for the compaction in alter table command
*
@@ -179,61 +192,14 @@ private[sql] case class DeleteLoadsById(
databaseNameOp: Option[String],
tableName: String) extends RunnableCommand {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
def run(sqlContext: SQLContext): Seq[Row] = {
-
- val databaseName = getDB.getDatabaseName(databaseNameOp, sqlContext)
- LOGGER.audit(s"Delete segment by Id request has been received for $databaseName.$tableName")
-
- // validate load ids first
- validateLoadIds
- val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-
- val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.get.carbonMetastore.lookupRelation1(
- identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER.audit(s"Delete segment by Id is failed. Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
-
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
-
- if (null == carbonTable) {
- CarbonEnv.get.carbonMetastore
- .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
- }
- val path = carbonTable.getMetaDataFilepath
-
- try {
- val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
- carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
-
- if (invalidLoadIds.isEmpty) {
-
- LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.")
- }
- else {
- sys.error("Delete segment by Id is failed. Invalid ID is:" +
- s" ${ invalidLoadIds.mkString(",") }")
- }
- } catch {
- case ex: Exception =>
- sys.error(ex.getMessage)
- }
-
+ Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+ CarbonStore.deleteLoadById(
+ loadids,
+ getDB.getDatabaseName(databaseNameOp, sqlContext),
+ tableName
+ )
Seq.empty
-
- }
-
- // validates load ids
- private def validateLoadIds: Unit = {
- if (loadids.isEmpty) {
- val errorMessage = "Error: Segment id(s) should not be empty."
- throw new MalformedCarbonCommandException(errorMessage)
-
- }
}
}
@@ -243,90 +209,14 @@ private[sql] case class DeleteLoadsByLoadDate(
dateField: String,
loadDate: String) extends RunnableCommand {
- val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema")
-
def run(sqlContext: SQLContext): Seq[Row] = {
-
- LOGGER.audit("The delete segment by load date request has been received.")
- val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
- val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER
- .audit(s"Delete segment by load date is failed. Table $dbName.$tableName does not " +
- s"exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
-
- val timeObj = Cast(Literal(loadDate), TimestampType).eval()
- if (null == timeObj) {
- val errorMessage = "Error: Invalid load start time format " + loadDate
- throw new MalformedCarbonCommandException(errorMessage)
- }
-
- val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(dbName + '_' + tableName)
- if (null == carbonTable) {
- var relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
- }
- val path = carbonTable.getMetaDataFilepath()
-
- try {
- val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus(
- carbonTable.getAbsoluteTableIdentifier, loadDate, path,
- timeObj.asInstanceOf[java.lang.Long]).asScala
- if (invalidLoadTimestamps.isEmpty) {
- LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.")
- }
- else {
- sys.error("Delete segment by date is failed. No matching segment found.")
- }
- } catch {
- case ex: Exception =>
- sys.error(ex.getMessage)
- }
+ Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+ CarbonStore.deleteLoadByDate(
+ loadDate,
+ getDB.getDatabaseName(databaseNameOp, sqlContext),
+ tableName
+ )
Seq.empty
-
- }
-
-}
-
-object LoadTable {
-
- def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- model: DictionaryLoadModel,
- noDictDimension: Array[CarbonDimension]): Unit = {
-
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
- model.table)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
-
- // read TableInfo
- val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
-
- // modify TableInfo
- val columns = tableInfo.getFact_table.getTable_columns
- for (i <- 0 until columns.size) {
- if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
- columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
- }
- }
-
- // write TableInfo
- CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
- // update Metadata
- val catalog = CarbonEnv.get.carbonMetastore
- catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
- model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
-
- // update CarbonDataLoadSchema
- val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
- model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
- carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
}
}
@@ -511,7 +401,7 @@ case class LoadTable(
carbonLoadModel.setCsvHeader(fileHeader)
carbonLoadModel.setColDictFilePath(columnDict)
carbonLoadModel.setDirectLoad(true)
- GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
+ GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
GlobalDictionaryUtil
.generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
dataFrame)
@@ -565,6 +455,40 @@ case class LoadTable(
Seq.empty
}
+ private def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
+ sqlContext: SQLContext,
+ model: DictionaryLoadModel,
+ noDictDimension: Array[CarbonDimension]): Unit = {
+
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
+ model.table)
+ val schemaFilePath = carbonTablePath.getSchemaFilePath
+
+ // read TableInfo
+ val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
+
+ // modify TableInfo
+ val columns = tableInfo.getFact_table.getTable_columns
+ for (i <- 0 until columns.size) {
+ if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
+ columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
+ }
+ }
+
+ // write TableInfo
+ CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
+
+ // update Metadata
+ val catalog = CarbonEnv.get.carbonMetastore
+ catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
+ model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+
+ // update CarbonDataLoadSchema
+ val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
+ model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
+ carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+ }
+
private def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
val dimensions = table.getDimensionByTableName(tableName).asScala
if (dateFormat != null) {
@@ -651,60 +575,14 @@ private[sql] case class ShowLoads(
limit: Option[String],
override val output: Seq[Attribute]) extends RunnableCommand {
-
override def run(sqlContext: SQLContext): Seq[Row] = {
- val databaseName = getDB.getDatabaseName(databaseNameOp, sqlContext)
- val tableUniqueName = databaseName + "_" + tableName
- // Here using checkSchemasModifiedTimeAndReloadTables in tableExists to reload metadata if
- // schema is changed by other process, so that tableInfoMap woulb be refilled.
- val tableExists = CarbonEnv.get.carbonMetastore
- .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
- if (!tableExists) {
- sys.error(s"$databaseName.$tableName is not found")
- }
- val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(tableUniqueName)
- if (carbonTable == null) {
- sys.error(s"$databaseName.$tableName is not found")
- }
- val path = carbonTable.getMetaDataFilepath
- val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
- if (loadMetadataDetailsArray.nonEmpty) {
-
- val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
-
- var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith(
- (l1, l2) => java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double
- .parseDouble(l2.getLoadName)
- )
-
-
- if (limit.isDefined) {
- loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
- .filter(load => load.getVisibility.equalsIgnoreCase("true"))
- val limitLoads = limit.get
- try {
- val lim = Integer.parseInt(limitLoads)
- loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
- } catch {
- case ex: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
- }
-
- }
-
- loadMetadataDetailsSortedArray.filter(load => load.getVisibility.equalsIgnoreCase("true"))
- .map(load =>
- Row(
- load.getLoadName,
- load.getLoadStatus,
- new java.sql.Timestamp(parser.parse(load.getLoadStartTime).getTime),
- new java.sql.Timestamp(parser.parse(load.getTimestamp).getTime))).toSeq
- } else {
- Seq.empty
-
- }
+ Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+ CarbonStore.showSegments(
+ getDB.getDatabaseName(databaseNameOp, sqlContext),
+ tableName,
+ limit
+ )
}
-
}
private[sql] case class DescribeCommandFormatted(
@@ -786,92 +664,17 @@ private[sql] case class DescribeCommandFormatted(
}
}
-private[sql] case class DeleteLoadByDate(
- databaseNameOp: Option[String],
- tableName: String,
- dateField: String,
- dateValue: String
-) extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
- LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
- val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
- var level: String = ""
- val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
- .getInstance().getCarbonTable(dbName + '_' + tableName)
- if (relation == null) {
- LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
- filter => filter.name.equalsIgnoreCase(dateField) &&
- filter.dataType.isInstanceOf[TimestampType]).toList
- if (matches.isEmpty) {
- LOGGER.audit("The delete load by date is failed. " +
- s"Table $dbName.$tableName does not contain date field: $dateField")
- sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
- } else {
- level = matches.asJava.get(0).name
- }
- val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
- .getColName
- DataManagementFunc.deleteLoadByDate(
- sqlContext,
- new CarbonDataLoadSchema(carbonTable),
- dbName,
- tableName,
- CarbonEnv.get.carbonMetastore.storePath,
- level,
- actualColName,
- dateValue)
- LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
- Seq.empty
- }
-
-}
-
private[sql] case class CleanFiles(
databaseNameOp: Option[String],
tableName: String) extends RunnableCommand {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
def run(sqlContext: SQLContext): Seq[Row] = {
- val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
- LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
- val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(identifier)(sqlContext).
- asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER.audit(s"The clean files request is failed. Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
-
- val carbonLoadModel = new CarbonLoadModel()
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- val table = relation.tableMeta.carbonTable
- carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
- carbonLoadModel.setTableName(table.getFactTableName)
- carbonLoadModel.setStorePath(relation.tableMeta.storePath)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- try {
- DataManagementFunc.cleanFiles(
- sqlContext.sparkContext,
- carbonLoadModel,
- relation.tableMeta.storePath)
- LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
- } catch {
- case ex: Exception =>
- sys.error(ex.getMessage)
- }
+ Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+ CarbonStore.cleanFiles(
+ getDB.getDatabaseName(databaseNameOp, sqlContext),
+ tableName,
+ sqlContext.asInstanceOf[CarbonContext].storePath
+ )
Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 62803c7..9cdbf86 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -148,12 +148,12 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
}
- def tableExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
+ def tableExists(identifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
checkSchemasModifiedTimeAndReloadTables()
- val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+ val database = identifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
val tables = metadata.tablesMeta.filter(
c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(identifier.table))
tables.nonEmpty
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java b/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
index 0161a45..ed4f95b 100644
--- a/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
+++ b/integration/spark/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java
@@ -28,7 +28,6 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.carbondata.core.carbon.datastore.block.Distributable;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 79b1953..de07707 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -431,8 +431,8 @@ object CarbonDataRDDFactory {
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
// Check if any load need to be deleted before loading new data
- DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
- isForceDeletion = false)
+ DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, storePath, isForceDeletion = false)
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 10fffd9..68ad4d6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -17,17 +17,13 @@
package org.apache.spark.sql.execution.command
-import java.text.SimpleDateFormat
-
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
-import org.apache.spark.sql.types.TimestampType
import org.apache.spark.util.FileUtils
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -40,12 +36,11 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, GlobalDictionaryUtil}
/**
@@ -58,7 +53,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
def run(sparkSession: SparkSession): Seq[Row] = {
- // TODO : Implement it.
val tableName = alterTableModel.tableName
val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
@@ -142,124 +136,6 @@ case class CreateTable(cm: TableModel) {
}
}
-case class DeleteLoadsById(
- loadids: Seq[String],
- databaseNameOp: Option[String],
- tableName: String) {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sparkSession: SparkSession): Seq[Row] = {
-
- val databaseName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"Delete segment by Id request has been received for $databaseName.$tableName")
-
- // validate load ids first
- validateLoadIds
- val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
- val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.get.carbonMetastore.lookupRelation(
- identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER.audit(s"Delete segment by Id is failed. Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
-
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
-
- if (null == carbonTable) {
- CarbonEnv.get.carbonMetastore
- .lookupRelation(identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
- }
- val path = carbonTable.getMetaDataFilepath
-
- try {
- val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
- carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
-
- if (invalidLoadIds.isEmpty) {
-
- LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.")
- }
- else {
- sys.error("Delete segment by Id is failed. Invalid ID is:" +
- s" ${ invalidLoadIds.mkString(",") }")
- }
- } catch {
- case ex: Exception =>
- sys.error(ex.getMessage)
- }
-
- Seq.empty
-
- }
-
- // validates load ids
- private def validateLoadIds: Unit = {
- if (loadids.isEmpty) {
- val errorMessage = "Error: Segment id(s) should not be empty."
- throw new MalformedCarbonCommandException(errorMessage)
-
- }
- }
-}
-
-case class DeleteLoadsByLoadDate(
- databaseNameOp: Option[String],
- tableName: String,
- dateField: String,
- loadDate: String) {
-
- val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.TableModel.tableSchema")
-
- def run(sparkSession: SparkSession): Seq[Row] = {
-
- LOGGER.audit("The delete segment by load date request has been received.")
- val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
- val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation(identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER
- .audit(s"Delete segment by load date is failed. Table $dbName.$tableName does not " +
- s"exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
-
- val timeObj = Cast(Literal(loadDate), TimestampType).eval()
- if (null == timeObj) {
- val errorMessage = "Error: Invalid load start time format " + loadDate
- throw new MalformedCarbonCommandException(errorMessage)
- }
-
- val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(dbName + '_' + tableName)
-
- if (null == carbonTable) {
- var relation = CarbonEnv.get.carbonMetastore
- .lookupRelation(identifier, None)(sparkSession).asInstanceOf[CarbonRelation]
- }
- val path = carbonTable.getMetaDataFilepath()
-
- try {
- val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus(
- carbonTable.getAbsoluteTableIdentifier, loadDate, path,
- timeObj.asInstanceOf[java.lang.Long]).asScala
- if (invalidLoadTimestamps.isEmpty) {
- LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.")
- }
- else {
- sys.error("Delete segment by date is failed. No matching segment found.")
- }
- } catch {
- case ex: Exception =>
- sys.error(ex.getMessage)
- }
- Seq.empty
-
- }
-
-}
object LoadTable {
@@ -557,152 +433,3 @@ case class LoadTable(
}
}
}
-
-private[sql] case class DeleteLoadByDate(
- databaseNameOp: Option[String],
- tableName: String,
- dateField: String,
- dateValue: String) {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
- val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
- var level: String = ""
- val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
- .getInstance().getCarbonTable(dbName + '_' + tableName)
- if (relation == null) {
- LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
- filter => filter.name.equalsIgnoreCase(dateField) &&
- filter.dataType.isInstanceOf[TimestampType]).toList
- if (matches.isEmpty) {
- LOGGER.audit("The delete load by date is failed. " +
- s"Table $dbName.$tableName does not contain date field: $dateField")
- sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
- } else {
- level = matches.asJava.get(0).name
- }
- val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
- .getColName
- DataManagementFunc.deleteLoadByDate(
- sparkSession.sqlContext,
- new CarbonDataLoadSchema(carbonTable),
- dbName,
- tableName,
- CarbonEnv.get.carbonMetastore.storePath,
- level,
- actualColName,
- dateValue)
- LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
- Seq.empty
- }
-
-}
-
-case class CleanFiles(
- databaseNameOp: Option[String],
- tableName: String) {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
- val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER.audit(s"The clean files request is failed. Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
-
- val carbonLoadModel = new CarbonLoadModel()
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- val table = relation.tableMeta.carbonTable
- carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
- carbonLoadModel.setTableName(table.getFactTableName)
- carbonLoadModel.setStorePath(relation.tableMeta.storePath)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- try {
- DataManagementFunc.cleanFiles(
- sparkSession.sqlContext.sparkContext,
- carbonLoadModel,
- relation.tableMeta.storePath)
- LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
- } catch {
- case ex: Exception =>
- sys.error(ex.getMessage)
- }
- Seq.empty
- }
-}
-
-case class ShowLoads(
- databaseNameOp: Option[String],
- tableName: String,
- limit: Option[String],
- val output: Seq[Attribute]) {
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- val databaseName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
- val tableUniqueName = databaseName + "_" + tableName
- // Here using checkSchemasModifiedTimeAndReloadTables in tableExists to reload metadata if
- // schema is changed by other process, so that tableInfoMap woulb be refilled.
- val tableExists = CarbonEnv.get.carbonMetastore
- .tableExists(TableIdentifier(tableName, databaseNameOp))(sparkSession)
- if (!tableExists) {
- sys.error(s"$databaseName.$tableName is not found")
- }
- val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(tableUniqueName)
- if (carbonTable == null) {
- sys.error(s"$databaseName.$tableName is not found")
- }
- val path = carbonTable.getMetaDataFilepath
- val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
- if (loadMetadataDetailsArray.nonEmpty) {
-
- val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
-
- var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith(
- (l1, l2) => java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double
- .parseDouble(l2.getLoadName)
- )
-
-
- if (limit.isDefined) {
- loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
- .filter(load => load.getVisibility.equalsIgnoreCase("true"))
- val limitLoads = limit.get
- try {
- val lim = Integer.parseInt(limitLoads)
- loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
- } catch {
- case ex: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
- }
-
- }
-
- loadMetadataDetailsSortedArray.filter(load => load.getVisibility.equalsIgnoreCase("true"))
- .map(load =>
- Row(
- load.getLoadName,
- load.getLoadStatus,
- new java.sql.Timestamp(parser.parse(load.getLoadStartTime).getTime),
- new java.sql.Timestamp(parser.parse(load.getTimestamp).getTime))).toSeq
- } else {
- Seq.empty
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 9467804..9638b8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -176,12 +176,14 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
}
- def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
+ def tableExists(
+ table: String,
+ databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean = {
checkSchemasModifiedTimeAndReloadTables()
- val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+ val database = databaseOp.getOrElse(sparkSession.catalog.currentDatabase)
val tables = metadata.tablesMeta.filter(
c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(table))
tables.nonEmpty
}
@@ -294,7 +296,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
dbName: String, tableName: String)
(sparkSession: SparkSession): String = {
- if (tableExists(TableIdentifier(tableName, Some(dbName)))(sparkSession)) {
+ if (tableExists(tableName, Some(dbName))(sparkSession)) {
sys.error(s"Table [$tableName] already exists under Database [$dbName]")
}
val schemaConverter = new ThriftWrapperSchemaConverterImpl
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index f493af1..c84882e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,7 +18,8 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.execution.command.{CleanFiles => TableCleanFiles}
+
+import org.apache.carbondata.api.CarbonStore
/**
* clean files api
@@ -26,21 +27,23 @@ import org.apache.spark.sql.execution.command.{CleanFiles => TableCleanFiles}
// scalastyle:off
object CleanFiles {
- def cleanFiles(spark: SparkSession, dbName: Option[String], tableName: String): Unit = {
- TableCleanFiles(dbName, tableName).run(spark)
+ def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
+ storePath: String): Unit = {
+ TableAPIUtil.validateTableExists(spark, dbName, tableName)
+ CarbonStore.cleanFiles(dbName, tableName, storePath)
}
def main(args: Array[String]): Unit = {
if (args.length < 2) {
- System.err.println("Usage: TableCleanFiles <store path> <table name>");
+ System.err.println("Usage: CleanFiles <store path> <table name>");
System.exit(1)
}
val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
- val spark = TableAPIUtil.spark(storePath, s"TableCleanFiles: $dbName.$tableName")
+ val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
- cleanFiles(spark, Option(dbName), tableName)
+ cleanFiles(spark, dbName, tableName, storePath)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 90310d3..1e891fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -25,22 +25,23 @@ import org.apache.spark.sql.execution.command.{AlterTableCompaction, AlterTableM
// scalastyle:off
object Compaction {
- def compaction(spark: SparkSession, dbName: Option[String], tableName: String,
+ def compaction(spark: SparkSession, dbName: String, tableName: String,
compactionType: String): Unit = {
- AlterTableCompaction(AlterTableModel(dbName, tableName, compactionType, "")).run(spark)
+ TableAPIUtil.validateTableExists(spark, dbName, tableName)
+ AlterTableCompaction(AlterTableModel(Some(dbName), tableName, compactionType, "")).run(spark)
}
def main(args: Array[String]): Unit = {
if (args.length < 3) {
- System.err.println("Usage: TableCompaction <store path> <table name> <major|minor>");
+ System.err.println("Usage: Compaction <store path> <table name> <major|minor>");
System.exit(1)
}
val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val compactionType = TableAPIUtil.escape(args(2))
- val spark = TableAPIUtil.spark(storePath, s"TableCompaction: $dbName.$tableName")
+ val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
- compaction(spark, Option(dbName), tableName, compactionType)
+ compaction(spark, dbName, tableName, compactionType)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index f77a16e..ae95bf6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -17,7 +17,8 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.execution.command.DeleteLoadsByLoadDate
+
+import org.apache.carbondata.api.CarbonStore
/**
* delete segments before some date
@@ -25,23 +26,24 @@ import org.apache.spark.sql.execution.command.DeleteLoadsByLoadDate
// scalastyle:off
object DeleteSegmentByDate {
- def deleteSegmentByDate(spark: SparkSession, dbName: Option[String], tableName: String,
+ def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String,
dateValue: String): Unit = {
- DeleteLoadsByLoadDate(dbName, tableName, "", dateValue).run(spark)
+ TableAPIUtil.validateTableExists(spark, dbName, tableName)
+ CarbonStore.deleteLoadByDate(dateValue, dbName, tableName)
}
def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println(
- "Usage: TableDeleteSegmentByDate <store path> <table name> <before date value>");
+ "Usage: DeleteSegmentByDate <store path> <table name> <before date value>");
System.exit(1)
}
val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val dateValue = TableAPIUtil.escape(args(2))
- val spark = TableAPIUtil.spark(storePath, s"TableCleanFiles: $dbName.$tableName")
+ val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
- deleteSegmentByDate(spark, Option(dbName), tableName, dateValue)
+ deleteSegmentByDate(spark, dbName, tableName, dateValue)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index c3e8626..d5a6861 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -17,7 +17,8 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.execution.command.DeleteLoadsById
+
+import org.apache.carbondata.api.CarbonStore
/**
* delete segments by id list
@@ -29,24 +30,25 @@ object DeleteSegmentById {
segmentIds.split(",").toSeq
}
- def deleteSegmentById(spark: SparkSession, dbName: Option[String], tableName: String,
+ def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String,
segmentIds: Seq[String]): Unit = {
- DeleteLoadsById(segmentIds, dbName, tableName).run(spark)
+ TableAPIUtil.validateTableExists(spark, dbName, tableName)
+ CarbonStore.deleteLoadById(segmentIds, dbName, tableName)
}
def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println(
- "Usage: TableDeleteSegmentByID <store path> <table name> <segment id list>");
+ "Usage: DeleteSegmentByID <store path> <table name> <segment id list>");
System.exit(1)
}
val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
- val spark = TableAPIUtil.spark(storePath, s"TableDeleteSegmentById: $dbName.$tableName")
+ val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
- deleteSegmentById(spark, Option(dbName), tableName, segmentIds)
+ deleteSegmentById(spark, dbName, tableName, segmentIds)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index 5ddffcd..1a02c8c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -17,41 +17,41 @@
package org.apache.spark.util
+import java.text.SimpleDateFormat
+
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.command.ShowLoads
-import org.apache.spark.sql.types.{StringType, TimestampType}
+
+import org.apache.carbondata.api.CarbonStore
// scalastyle:off
object ShowSegments {
- def showSegments(spark: SparkSession, dbName: Option[String], tableName: String,
+ def showSegments(spark: SparkSession, dbName: String, tableName: String,
limit: Option[String]): Seq[Row] = {
- val output = Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
- AttributeReference("Status", StringType, nullable = false)(),
- AttributeReference("Load Start Time", TimestampType, nullable = false)(),
- AttributeReference("Load End Time", TimestampType, nullable = false)())
- ShowLoads(dbName, tableName, limit: Option[String], output).run(spark)
+ //val databaseName = dbName.getOrElse(spark.catalog.currentDatabase)
+ TableAPIUtil.validateTableExists(spark, dbName, tableName)
+ CarbonStore.showSegments(dbName, tableName, limit)
}
def showString(rows: Seq[Row]): String = {
+ val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.s")
val sb = new StringBuilder
- sb.append("+-----------------+---------------+---------------------+---------------------+\n")
- .append("|SegmentSequenceId|Status |Load Start Time |Load End Time |\n")
- .append("+-----------------+---------------+---------------------+---------------------+\n")
+ sb.append("+------------+------------------+----------------------+----------------------+\n")
+ .append("|SegmentId |Status |Load Start Time |Load End Time |\n")
+ .append("+------------+------------------+----------------------+----------------------+\n")
rows.foreach{row =>
sb.append("|")
- .append(StringUtils.rightPad(row.getString(0), 17))
+ .append(StringUtils.rightPad(row.getString(0), 12))
.append("|")
- .append(StringUtils.rightPad(row.getString(1).substring(0, 15), 15))
+ .append(StringUtils.rightPad(row.getString(1), 18))
.append("|")
- .append(row.getAs[java.sql.Timestamp](2).formatted("yyyy-MM-dd HH:mm:ss.s"))
+ .append(sdf.format(row.getAs[java.sql.Timestamp](2)))
.append("|")
- .append(row.getAs[java.sql.Timestamp](3).formatted("yyyy-MM-dd HH:mm:ss.s"))
+ .append(sdf.format(row.getAs[java.sql.Timestamp](3)))
.append("|\n")
}
- sb.append("+-----------------+---------------+---------------------+---------------------+\n")
+ sb.append("+------------+------------------+----------------------+----------------------+\n")
sb.toString
}
@@ -74,9 +74,9 @@ object ShowSegments {
} else {
None
}
- val spark = TableAPIUtil.spark(storePath, s"TableCleanFiles: $dbName.$tableName")
+ val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
- val rows = showSegments(spark, Option(dbName), tableName, limit)
+ val rows = showSegments(spark, dbName, tableName, limit)
System.out.println(showString(rows))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b50866b3/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
index 6954981..cb444d6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
@@ -17,14 +17,20 @@
package org.apache.spark.util
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* table api util
*/
object TableAPIUtil {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
def parseSchemaName(tableName: String): (String, String) = {
if (tableName.contains(".")) {
val parts = tableName.split(".")
@@ -44,11 +50,22 @@ object TableAPIUtil {
}
def spark(storePath: String, appName: String): SparkSession = {
+ // CarbonEnv depends on CarbonProperty to get the store path, so set it here
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
SparkSession
.builder
.appName(appName)
- .master("local")
- .config(CarbonCommonConstants.STORE_LOCATION, storePath)
.getOrCreate()
}
+
+ def validateTableExists(
+ spark: SparkSession,
+ dbName: String,
+ tableName: String): Unit = {
+ if (!CarbonEnv.get.carbonMetastore.tableExists(tableName, Some(dbName))(spark)) {
+ val err = s"table $dbName.$tableName not found"
+ LOGGER.error(err)
+ throw new MalformedCarbonCommandException(err)
+ }
+ }
}
[3/3] incubator-carbondata git commit: [CARBONDATA-546] Extract data
management command to carbon-spark-common module This closes #452
Posted by ch...@apache.org.
[CARBONDATA-546] Extract data management command to carbon-spark-common module This closes #452
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/4a8e15d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/4a8e15d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/4a8e15d3
Branch: refs/heads/master
Commit: 4a8e15d346399601bb715d4d062eff7185d6382c
Parents: 956eb33 b50866b
Author: chenliang613 <ch...@apache.org>
Authored: Sat Dec 24 22:12:46 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Sat Dec 24 22:12:46 2016 +0800
----------------------------------------------------------------------
.../core/carbon/AbsoluteTableIdentifier.java | 6 +
.../core/carbon/path/CarbonStorePath.java | 6 +-
.../core/carbon/path/CarbonTablePath.java | 35 +-
.../carbondata/examples/CarbonExample.scala | 1 +
.../carbondata/spark/load/CarbonLoaderUtil.java | 19 +-
.../spark/load/DeleteLoadFolders.java | 37 +-
.../carbondata/spark/util/LoadMetadataUtil.java | 7 +-
.../org/apache/carbondata/api/CarbonStore.scala | 176 ++++++++++
.../spark/rdd/DataManagementFunc.scala | 163 ++-------
.../spark/rdd/CarbonDataRDDFactory.scala | 4 +-
.../execution/command/carbonTableSchema.scala | 349 ++++---------------
.../apache/spark/sql/hive/CarbonMetastore.scala | 6 +-
.../spark/load/CarbonLoaderUtilTest.java | 1 -
.../spark/rdd/CarbonDataRDDFactory.scala | 4 +-
.../execution/command/carbonTableSchema.scala | 275 +--------------
.../apache/spark/sql/hive/CarbonMetastore.scala | 10 +-
.../org/apache/spark/util/CleanFiles.scala | 15 +-
.../org/apache/spark/util/Compaction.scala | 11 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 14 +-
.../apache/spark/util/DeleteSegmentById.scala | 14 +-
.../org/apache/spark/util/ShowSegments.scala | 38 +-
.../org/apache/spark/util/TableAPIUtil.scala | 23 +-
.../spark2/src/test/resources/data_alltypes.csv | 10 +
.../AllDataTypesTestCaseAggregate.scala | 10 +-
.../carbondata/CarbonDataSourceSuite.scala | 3 +-
.../spark/carbondata/util/QueryTest.scala | 66 ----
.../vectorreader/VectorReaderTestCase.scala | 6 +-
.../sql/common/util/CarbonSessionTest.scala | 74 ----
.../spark/sql/common/util/QueryTest.scala | 111 +++++-
.../apache/spark/util/CarbonCommandSuite.scala | 78 +++++
.../lcm/status/SegmentStatusManager.java | 4 +
31 files changed, 608 insertions(+), 968 deletions(-)
----------------------------------------------------------------------