You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/11/12 08:41:04 UTC

[GitHub] [carbondata] kunal642 commented on a change in pull request #4227: [CARBONDATA-4296]: schema evolution, enforcement and deduplication utilities added

kunal642 commented on a change in pull request #4227:
URL: https://github.com/apache/carbondata/pull/4227#discussion_r748068793



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +475,351 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, simply ignore them.
+   * If some columns are missing in source schema as compared to target schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, exception is thrown.
+   * If source schema has multiple columns whose names differ only in case sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    verifyBackwardsCompatibility(targetDs, srcDs)
+
+    val lowerCaseSrcSchemaFields = sourceSchema.fields.map(_.name.toLowerCase)
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = lowerCaseSrcSchemaFields
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      if (additionalSourceFields.nonEmpty) {
+        LOGGER.warn(s"source schema contains additional fields which are not present in " +
+                    s"target schema: ${ additionalSourceFields.mkString(",") }")
+      }
+    }
+
+    // check if source schema has fields whose names only differ in case sensitivity
+    val similarFields = lowerCaseSrcSchemaFields.groupBy(a => identity(a)).map {
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)

Review comment:
       please handle this comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org