You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:42:03 UTC

[16/50] [abbrv] incubator-carbondata git commit: Added Validation for delete segments query, load start time parameter (#752)

Added Validation for delete segments query, load start time parameter (#752)

* Added Validation for delete segments, load start time parameter

* fixed review comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/73975b02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/73975b02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/73975b02

Branch: refs/heads/master
Commit: 73975b02ce8a8e62ad052c08ed712a32f1e02f37
Parents: 148285d
Author: Manu <ma...@gmail.com>
Authored: Sun Jun 26 10:30:53 2016 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Sun Jun 26 10:30:53 2016 +0530

----------------------------------------------------------------------
 .../core/load/LoadMetadataDetails.java          | 43 ++++++++++++++++++++
 .../execution/command/carbonTableSchema.scala   | 17 ++++++--
 .../dataretention/DataRetentionTestCase.scala   | 37 +++++++++++++++++
 .../lcm/status/SegmentStatusManager.java        | 33 +++++----------
 4 files changed, 103 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/73975b02/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java b/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
index 41edd98..0250b2e 100644
--- a/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
+++ b/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
@@ -20,6 +20,13 @@
 package org.carbondata.core.load;
 
 import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.constants.CarbonCommonConstants;
 
 public class LoadMetadataDetails implements Serializable {
 
@@ -28,6 +35,15 @@ public class LoadMetadataDetails implements Serializable {
   private String loadStatus;
   private String loadName;
   private String partitionCount;
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LoadMetadataDetails.class.getName());
+
+  private static final SimpleDateFormat parser =
+      new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
   /**
    * Segment modification or deletion time stamp
    */
@@ -126,6 +142,33 @@ public class LoadMetadataDetails implements Serializable {
   }
 
   /**
+   * return loadStartTime
+   * @return
+   */
+  public long getLoadStartTimeAsLong() {
+    return getTimeStamp(loadStartTime);
+  }
+
+  /**
+   * returns load start time as long value
+   * @param loadStartTime
+   * @return
+   */
+  private Long getTimeStamp(String loadStartTime) {
+    if (loadStartTime.isEmpty()) {
+      return null;
+    }
+
+    Date dateToStr = null;
+    try {
+      dateToStr = parser.parse(loadStartTime);
+      return dateToStr.getTime() * 1000;
+    } catch (ParseException e) {
+      LOGGER.error("Cannot convert" + loadStartTime + " to Time/Long type value" + e.getMessage());
+      return null;
+    }
+  }
+  /**
    * @param loadStartTime
    */
   public void setLoadStartTime(String loadStartTime) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/73975b02/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 59c8566..99cb9ca 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import java.io.File
 import java.text.SimpleDateFormat
 import java.util
-import java.util.UUID
+import java.util.{Date, UUID}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -29,7 +29,10 @@ import scala.util.Random
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
 import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types.TimestampType
@@ -1376,6 +1379,12 @@ private[sql] case class DeleteLoadsByLoadDate(
       sys.error(s"Table $schemaName.$tableName does not exist")
     }
 
+    val timeObj = Cast(Literal(loadDate), TimestampType).eval()
+    if(null == timeObj) {
+      val errorMessage = "Error: Invalid load start time format " + loadDate
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+
     var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
       .getCarbonTable(schemaName + '_' + tableName)
     var segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
@@ -1389,8 +1398,8 @@ private[sql] case class DeleteLoadsByLoadDate(
     }
     var path = carbonTable.getMetaDataFilepath()
 
-
-    var invalidLoadTimestamps = segmentStatusManager.updateDeletionStatus(loadDate, path).asScala
+    var invalidLoadTimestamps = segmentStatusManager
+      .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
     LOGGER.audit("Delete load by load date is successfull.")
     Seq.empty
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/73975b02/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index c749a7e..683dcf8 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -185,5 +185,42 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
       sql("select count(*) from caRbon_TabLe_1"), Seq(Row(0)))
 
   }
+  test("RetentionTest_DeleteSegmentsByLoadTimeValiadtion") {
+
+    try {
+      sql(
+        "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before" +
+        " 'abcd-01-01 00:00:00'")
+      assert(false)
+    } catch {
+      case e: MalformedCarbonCommandException =>
+        assert(e.getMessage.contains("Invalid load start time format"))
+      case _ => assert(false)
+    }
+
+    try {
+      sql(
+        "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before" +
+        " '2099:01:01 00:00:00'")
+      assert(false)
+    } catch {
+      case e: MalformedCarbonCommandException =>
+        assert(e.getMessage.contains("Invalid load start time format"))
+      case _ => assert(false)
+    }
+
+    checkAnswer(
+      sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" +
+          " IN ('china','ind','aus','eng') GROUP BY country"
+      ),
+      Seq(Row("ind", 9))
+    )
+    sql("DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before '2099-01-01'")
+    checkAnswer(
+      sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" +
+          " IN ('china','ind','aus','eng') GROUP BY country"), Seq())
+
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/73975b02/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
index 8e4c627..8708779 100644
--- a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
@@ -19,7 +19,6 @@
 package org.carbondata.lcm.status;
 
 import java.io.*;
-import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -221,23 +220,9 @@ public class SegmentStatusManager {
    * @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg,
    * 0 otherwise
    */
-  private Integer compareDateStrings(String loadValue, String userValue) {
-    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    SimpleDateFormat defaultSdf =
-        new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
-    try {
-      Date loadDate = sdf.parse(loadValue);
-      Date userDate = defaultSdf.parse(userValue);
-      if (loadDate.before(userDate)) {
-        return -1;
-      } else if (loadDate.after(userDate)) {
-        return 1;
-      }
-      return 0;
+  private Integer compareDateValues(Long loadValue, Long userValue) {
 
-    } catch (ParseException pe) {
-      return null;
-    }
+    return loadValue.compareTo(userValue);
   }
 
   /**
@@ -296,7 +281,8 @@ public class SegmentStatusManager {
    * @param tableFolderPath
    * @return
    */
-  public List<String> updateDeletionStatus(String loadDate, String tableFolderPath) {
+  public List<String> updateDeletionStatus(String loadDate, String tableFolderPath,
+      Long loadStartTime) {
     ICarbonLock carbonLock =
         CarbonLockFactory.getCarbonLockObj(tableFolderPath, LockUsage.METADATA_LOCK);
     List<String> invalidLoadTimestamps = new ArrayList<String>(0);
@@ -320,7 +306,7 @@ public class SegmentStatusManager {
         listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
         if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
           updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray,
-              invalidLoadTimestamps);
+              invalidLoadTimestamps, loadStartTime);
           if (!invalidLoadTimestamps.isEmpty()) {
             LOG.warn("Load doesnt exist or it is already deleted , LoadTimestamps-"
                 + invalidLoadTimestamps);
@@ -438,14 +424,15 @@ public class SegmentStatusManager {
    * @return invalidLoadTimestamps
    */
   public void updateDeletionStatus(String loadDate,
-      LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps) {
+      LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps,
+      Long loadStartTime) {
     // For each load timestamp loop through data and if the
     // required load timestamp is found then mark
     // the metadata as deleted.
     boolean loadFound = false;
-    String loadStartTime = "Load Start Time: ";
+    String loadStartTimeString = "Load Start Time: ";
     for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
-      Integer result = compareDateStrings(loadMetadata.getLoadStartTime(), loadDate);
+      Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
       if (null == result) {
         invalidLoadTimestamps.add(loadDate);
       } else if (result < 0) {
@@ -454,7 +441,7 @@ public class SegmentStatusManager {
           loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
           loadMetadata.setModificationOrdeletionTimesStamp(readCurrentTime());
           LOG.info("Info: " +
-              loadStartTime + loadMetadata.getLoadStartTime() +
+              loadStartTimeString + loadMetadata.getLoadStartTime() +
               " Marked for Delete");
         } else {
           // it is already deleted . can not delete it again.