You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:42:03 UTC
[16/50] [abbrv] incubator-carbondata git commit: Added Validation for
delete segments query, load start time parameter (#752)
Added Validation for delete segments query, load start time parameter (#752)
* Added Validation for delete segments, load start time parameter
* fixed review comment
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/73975b02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/73975b02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/73975b02
Branch: refs/heads/master
Commit: 73975b02ce8a8e62ad052c08ed712a32f1e02f37
Parents: 148285d
Author: Manu <ma...@gmail.com>
Authored: Sun Jun 26 10:30:53 2016 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Sun Jun 26 10:30:53 2016 +0530
----------------------------------------------------------------------
.../core/load/LoadMetadataDetails.java | 43 ++++++++++++++++++++
.../execution/command/carbonTableSchema.scala | 17 ++++++--
.../dataretention/DataRetentionTestCase.scala | 37 +++++++++++++++++
.../lcm/status/SegmentStatusManager.java | 33 +++++----------
4 files changed, 103 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/73975b02/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java b/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
index 41edd98..0250b2e 100644
--- a/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
+++ b/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
@@ -20,6 +20,13 @@
package org.carbondata.core.load;
import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.constants.CarbonCommonConstants;
public class LoadMetadataDetails implements Serializable {
@@ -28,6 +35,15 @@ public class LoadMetadataDetails implements Serializable {
private String loadStatus;
private String loadName;
private String partitionCount;
+
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LoadMetadataDetails.class.getName());
+
+ private static final SimpleDateFormat parser =
+ new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
/**
* Segment modification or deletion time stamp
*/
@@ -126,6 +142,33 @@ public class LoadMetadataDetails implements Serializable {
}
/**
+ * return loadStartTime
+ * @return
+ */
+ public long getLoadStartTimeAsLong() {
+ return getTimeStamp(loadStartTime);
+ }
+
+ /**
+ * returns load start time as long value
+ * @param loadStartTime
+ * @return
+ */
+ private Long getTimeStamp(String loadStartTime) {
+ if (loadStartTime.isEmpty()) {
+ return null;
+ }
+
+ Date dateToStr = null;
+ try {
+ dateToStr = parser.parse(loadStartTime);
+ return dateToStr.getTime() * 1000;
+ } catch (ParseException e) {
+ LOGGER.error("Cannot convert" + loadStartTime + " to Time/Long type value" + e.getMessage());
+ return null;
+ }
+ }
+ /**
* @param loadStartTime
*/
public void setLoadStartTime(String loadStartTime) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/73975b02/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 59c8566..99cb9ca 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import java.io.File
import java.text.SimpleDateFormat
import java.util
-import java.util.UUID
+import java.util.{Date, UUID}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -29,7 +29,10 @@ import scala.util.Random
import org.apache.spark.SparkEnv
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.TimestampType
@@ -1376,6 +1379,12 @@ private[sql] case class DeleteLoadsByLoadDate(
sys.error(s"Table $schemaName.$tableName does not exist")
}
+ val timeObj = Cast(Literal(loadDate), TimestampType).eval()
+ if(null == timeObj) {
+ val errorMessage = "Error: Invalid load start time format " + loadDate
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+
var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
.getCarbonTable(schemaName + '_' + tableName)
var segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
@@ -1389,8 +1398,8 @@ private[sql] case class DeleteLoadsByLoadDate(
}
var path = carbonTable.getMetaDataFilepath()
-
- var invalidLoadTimestamps = segmentStatusManager.updateDeletionStatus(loadDate, path).asScala
+ var invalidLoadTimestamps = segmentStatusManager
+ .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
LOGGER.audit("Delete load by load date is successfull.")
Seq.empty
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/73975b02/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index c749a7e..683dcf8 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -185,5 +185,42 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
sql("select count(*) from caRbon_TabLe_1"), Seq(Row(0)))
}
+ test("RetentionTest_DeleteSegmentsByLoadTimeValiadtion") {
+
+ try {
+ sql(
+ "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before" +
+ " 'abcd-01-01 00:00:00'")
+ assert(false)
+ } catch {
+ case e: MalformedCarbonCommandException =>
+ assert(e.getMessage.contains("Invalid load start time format"))
+ case _ => assert(false)
+ }
+
+ try {
+ sql(
+ "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before" +
+ " '2099:01:01 00:00:00'")
+ assert(false)
+ } catch {
+ case e: MalformedCarbonCommandException =>
+ assert(e.getMessage.contains("Invalid load start time format"))
+ case _ => assert(false)
+ }
+
+ checkAnswer(
+ sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" +
+ " IN ('china','ind','aus','eng') GROUP BY country"
+ ),
+ Seq(Row("ind", 9))
+ )
+ sql("DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before '2099-01-01'")
+ checkAnswer(
+ sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" +
+ " IN ('china','ind','aus','eng') GROUP BY country"), Seq())
+
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/73975b02/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
index 8e4c627..8708779 100644
--- a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
@@ -19,7 +19,6 @@
package org.carbondata.lcm.status;
import java.io.*;
-import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -221,23 +220,9 @@ public class SegmentStatusManager {
* @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg,
* 0 otherwise
*/
- private Integer compareDateStrings(String loadValue, String userValue) {
- SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
- SimpleDateFormat defaultSdf =
- new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
- try {
- Date loadDate = sdf.parse(loadValue);
- Date userDate = defaultSdf.parse(userValue);
- if (loadDate.before(userDate)) {
- return -1;
- } else if (loadDate.after(userDate)) {
- return 1;
- }
- return 0;
+ private Integer compareDateValues(Long loadValue, Long userValue) {
- } catch (ParseException pe) {
- return null;
- }
+ return loadValue.compareTo(userValue);
}
/**
@@ -296,7 +281,8 @@ public class SegmentStatusManager {
* @param tableFolderPath
* @return
*/
- public List<String> updateDeletionStatus(String loadDate, String tableFolderPath) {
+ public List<String> updateDeletionStatus(String loadDate, String tableFolderPath,
+ Long loadStartTime) {
ICarbonLock carbonLock =
CarbonLockFactory.getCarbonLockObj(tableFolderPath, LockUsage.METADATA_LOCK);
List<String> invalidLoadTimestamps = new ArrayList<String>(0);
@@ -320,7 +306,7 @@ public class SegmentStatusManager {
listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray,
- invalidLoadTimestamps);
+ invalidLoadTimestamps, loadStartTime);
if (!invalidLoadTimestamps.isEmpty()) {
LOG.warn("Load doesnt exist or it is already deleted , LoadTimestamps-"
+ invalidLoadTimestamps);
@@ -438,14 +424,15 @@ public class SegmentStatusManager {
* @return invalidLoadTimestamps
*/
public void updateDeletionStatus(String loadDate,
- LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps) {
+ LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps,
+ Long loadStartTime) {
// For each load timestamp loop through data and if the
// required load timestamp is found then mark
// the metadata as deleted.
boolean loadFound = false;
- String loadStartTime = "Load Start Time: ";
+ String loadStartTimeString = "Load Start Time: ";
for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
- Integer result = compareDateStrings(loadMetadata.getLoadStartTime(), loadDate);
+ Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
if (null == result) {
invalidLoadTimestamps.add(loadDate);
} else if (result < 0) {
@@ -454,7 +441,7 @@ public class SegmentStatusManager {
loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
loadMetadata.setModificationOrdeletionTimesStamp(readCurrentTime());
LOG.info("Info: " +
- loadStartTime + loadMetadata.getLoadStartTime() +
+ loadStartTimeString + loadMetadata.getLoadStartTime() +
" Marked for Delete");
} else {
// it is already deleted . can not delete it again.