You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/06/09 06:03:22 UTC

[GitHub] [carbondata] akashrn5 opened a new pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

akashrn5 opened a new pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148


    ### Why is this PR needed?
    
    
    ### What changes were proposed in this PR?
   
       
    ### Does this PR introduce any user interface change?
    - No
    - Yes. (please explain the change and update document)
   
    ### Is any new testcase added?
    - No
    - Yes
   
       
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r676327406



##########
File path: core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
##########
@@ -330,6 +339,11 @@ public void readFields(DataInput in) throws IOException {
         missingSISegments.add(in.readUTF());
       }
     }
+    this.isCDCJob = in.readBoolean();

Review comment:
       Will there be any compatibility issue if the index server has older version of this object (IndexInputFormat) in cache and reading it (deserializing) can give EOF exception as Boolean field was never written in the old version?

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
##########
@@ -163,20 +171,36 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
    * Method to serialize extended blocklet and input split for index server
    * DataFormat
    * <Extended Blocklet data><Carbon input split serializeData length><CarbonInputSplitData>
-   * @param out
-   * @param uniqueLocation
+   * @param out data output to write the primitives to extended blocklet
+   * @param uniqueLocation location to write the blocklet in case of distributed pruning, ex: Lucene
+   * @param isExternalPath identification for the externam segment
    * @throws IOException
    */
-  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob,
-      boolean isExternalPath)
+  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation,
+      IndexInputFormat indexInputFormat, boolean isExternalPath)
       throws IOException {
     super.write(out);
-    if (isCountJob) {
+    if (indexInputFormat.isCountStarJob()) {
       // In CarbonInputSplit, getDetailInfo() is a lazy call. we want to avoid this during
       // countStar query. As rowCount is filled inside getDetailInfo(). In countStar case we may
       // not have proper row count. So, always take row count from indexRow.
       out.writeLong(inputSplit.getIndexRow().getInt(BlockletIndexRowIndexes.ROW_COUNT_INDEX));
       out.writeUTF(inputSplit.getSegmentId());
+    } else if (indexInputFormat.getCdcVO() != null) {
+      // In case of CDC, we ust need the filepath and the min max of the blocklet, so just serialize

Review comment:
       ```suggestion
         // In case of CDC, we just need the filepath and the min max of the blocklet, so just serialize
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/range/MinMaxNode.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.range;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Each node to be inserted in BlockMinMaxTree for pruning.
+ */
+public class MinMaxNode implements Serializable {
+
+  // list of files present in same range of min max of this node
+  private List<String> filePaths = new ArrayList<>();
+
+  private Object min;
+
+  private Object max;
+
+  private MinMaxNode leftSubTree;

Review comment:
       `leftSubTree` itself has `min` and `max` stored inside, what again storing it in parent node ?

##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CdcVO.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * VO object which contains the info used in CDC case during cache loading in Index server
+ */
+public class CdcVO implements Serializable, Writable {
+
+  /**
+   * This collection contains column to index mapping which give info about the index for a column
+   * in IndexRow object to fetch min max
+   */
+  private Map<String, Integer> columnToIndexMap;
+
+  private List<Integer> indexesToFetch;

Review comment:
       so, when `indexesToFetch` is filled by deserialization, `columnToIndexMap` will be null ?
   
   If any point of time, if both columnToIndexMap and indexesToFetch is filled, then maybe storing once in columnToIndexMap is enough ?

##########
File path: core/src/main/java/org/apache/carbondata/core/scan/filter/executer/CDCBlockImplicitExecutorImpl.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.Set;
+
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * This filter executor class will be called when the CDC pruning is enabled.
+ */
+public class CDCBlockImplicitExecutorImpl implements FilterExecutor, ImplicitColumnFilterExecutor {
+
+  private final Set<String> blocksToScan;
+
+  public CDCBlockImplicitExecutorImpl(Set<String> blocksToScan) {
+    this.blocksToScan = blocksToScan;
+  }
+
+  @Override
+  public BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue,
+      String uniqueBlockPath, boolean[] isMinMaxSet) {
+    boolean isScanRequired = false;

Review comment:
       no need for this variable, directly set the bitmap if the path is in the set and return.

##########
File path: core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/CDCBlockImplicitExpression.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.expression.conditional;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+
+/**
+ * This expression will be added to Index filter when CDC pruning is enabled.
+ */
+public class CDCBlockImplicitExpression extends Expression {
+
+  Set<String> blocksToScan;
+
+  public CDCBlockImplicitExpression(String blockPathValues) {
+    blocksToScan =
+        Arrays.stream(blockPathValues.split(",")).map(String::trim).collect(Collectors.toSet());
+  }
+
+  @Override
+  public ExpressionResult evaluate(RowIntf value) {
+    throw new UnsupportedOperationException("Not allowed on Implicit expression");
+  }
+
+  @Override
+  public ExpressionType getFilterExpressionType() {
+    return ExpressionType.IMPLICIT;
+  }
+
+  @Override
+  public void findAndSetChild(Expression oldExpr, Expression newExpr) {
+

Review comment:
       better to throw unsupported operations ?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -79,113 +87,514 @@ case class CarbonMergeDataSetCommand(
    */
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val relations = CarbonSparkUtil.collectCarbonRelation(targetDsOri.logicalPlan)
-    // Target dataset must be backed by carbondata table.
+    val st = System.currentTimeMillis()
+    val targetDsAliasName = targetDsOri.logicalPlan match {
+      case alias: SubqueryAlias =>
+        alias.alias
+      case _ => null
+    }
+    val sourceAliasName = srcDS.logicalPlan match {
+      case alias: SubqueryAlias =>
+        alias.alias
+      case _ => null
+    }
     if (relations.length != 1) {
       throw new UnsupportedOperationException(
         "Carbon table supposed to be present in merge dataset")
     }
-    // validate the merge matches and actions.
-    validateMergeActions(mergeMatches, targetDsOri, sparkSession)
-    val carbonTable = relations.head.carbonRelation.carbonTable
-    val hasDelAction = mergeMatches.matchList
-      .exists(_.getActions.exists(_.isInstanceOf[DeleteAction]))
-    val hasUpdateAction = mergeMatches.matchList
-      .exists(_.getActions.exists(_.isInstanceOf[UpdateAction]))
-    val (insertHistOfUpdate, insertHistOfDelete) = getInsertHistoryStatus(mergeMatches)
-    // Get all the required columns of targetDS by going through all match conditions and actions.
-    val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession)
+    // Target dataset must be backed by carbondata table.
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
     // select only the required columns, it can avoid lot of and shuffling.
-    val targetDs = targetDsOri.select(columns: _*)
-    // Update the update mapping with unfilled columns.From here on system assumes all mappings
-    // are existed.
-    mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs)
-    // Lets generate all conditions combinations as one column and add them as 'status'.
-    val condition = generateStatusColumnWithAllCombinations(mergeMatches)
-
-    // decide join type based on match conditions
-    val joinType = decideJoinType
+    val targetDs = if (mergeMatches == null && operationType != null) {
+      targetDsOri.select(keyColumn)
+    } else {
+      // Get all the required columns of targetDS by going through all match conditions and actions.
+      val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession)
+      targetDsOri.select(columns: _*)
+    }
+    // decide join type based on match conditions or based on merge operation type
+    val joinType = if (mergeMatches == null && operationType != null) {
+      MergeOperationType.withName(operationType.toUpperCase) match {
+        case MergeOperationType.UPDATE | MergeOperationType.DELETE =>
+          "inner"
+        case MergeOperationType.UPSERT =>
+          "right_outer"
+        case MergeOperationType.INSERT =>
+          null
+      }
+    } else {
+      decideJoinType
+    }
 
-    val joinColumns = mergeMatches.joinExpr.expr.collect {
-      case unresolvedAttribute: UnresolvedAttribute if unresolvedAttribute.nameParts.nonEmpty =>
-        // Let's say the join condition will be something like A.id = B.id, then it will be an
-        // EqualTo expression, with left expression as UnresolvedAttribute(A.id) and right will
-        // be a Literal(B.id). Since we need the column name here, we can directly check the left
-        // which is UnresolvedAttribute. We take nameparts from UnresolvedAttribute which is an
-        // ArrayBuffer containing "A" and "id", since "id" is column name, we take
-        // nameparts.tail.head which gives us "id" column name.
-        unresolvedAttribute.nameParts.tail.head
-    }.distinct
+    val joinColumns = if (mergeMatches == null) {
+      Seq(keyColumn)
+    } else {
+      mergeMatches.joinExpr.expr.collect {
+        case unresolvedAttribute: UnresolvedAttribute if unresolvedAttribute.nameParts.nonEmpty =>
+          // Let's say the join condition will be something like A.id = B.id, then it will be an
+          // EqualTo expression, with left expression as UnresolvedAttribute(A.id) and right will
+          // be a Literal(B.id). Since we need the column name here, we can directly check the left
+          // which is UnresolvedAttribute. We take nameparts from UnresolvedAttribute which is an
+          // ArrayBuffer containing "A" and "id", since "id" is column name, we take
+          // nameparts.tail.head which gives us "id" column name.
+          unresolvedAttribute.nameParts.tail.head
+      }.distinct
+    }
 
     // repartition the srsDs, if the target has bucketing and the bucketing columns contains join
     // columns
     val repartitionedSrcDs =
-      if (carbonTable.getBucketingInfo != null &&
-          carbonTable.getBucketingInfo
+      if (targetCarbonTable.getBucketingInfo != null &&
+          targetCarbonTable.getBucketingInfo
             .getListOfColumns
             .asScala
             .map(_.getColumnName).containsSlice(joinColumns)) {
-        srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges,
+        srcDS.repartition(targetCarbonTable.getBucketingInfo.getNumOfRanges,
           joinColumns.map(srcDS.col): _*)
       } else {
       srcDS
+      }
+
+    // cache the source data as we will be scanning multiple times
+    repartitionedSrcDs.cache()

Review comment:
       can you break this function? currently, it is too long and hard to maintain.

##########
File path: core/src/main/java/org/apache/carbondata/core/range/BlockMinMaxTree.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.range;
+
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET;
+
+/**
+ * This class prepares a tree for pruning using min-max of block
+ */
+public class BlockMinMaxTree implements Serializable {
+
+  private MinMaxNode root;
+
+  private final boolean isPrimitiveAndNotDate;
+  private final boolean isDimensionColumn;
+  private final DataType joinDataType;
+  private final SerializableComparator comparator;
+
+  public BlockMinMaxTree(boolean isPrimitiveAndNotDate, boolean isDimensionColumn,
+      DataType joinDataType, SerializableComparator comparator) {
+    this.isPrimitiveAndNotDate = isPrimitiveAndNotDate;
+    this.isDimensionColumn = isDimensionColumn;
+    this.joinDataType = joinDataType;
+    this.comparator = comparator;
+  }
+
+  public MinMaxNode getRoot() {
+    return root;
+  }
+
+  public void insert(MinMaxNode newMinMaxNode) {
+    root = insert(getRoot(), newMinMaxNode);
+  }
+
+  private MinMaxNode insert(MinMaxNode root, MinMaxNode newMinMaxNode) {
+    /* 1. check if the root null, then insert and make new node
+     * 2. check if the new node completely overlaps with the root, where minCompare and maxCompare
+     * both are zero, if yes add the filepaths and return
+     * 3. if root is less than new node, check if the root has right subtree,
+     *    if(yes) {
+     *       replace the right node with the newnode's min and max based on comparison and then
+     *        call insert with right node as new root and newnode
+     *         insert(root.getRight, newnode)
+     *     } else {
+     *       make the new node as right node and set right node and return
+     *     }
+     * 4. if root is more than new node, check if the root has left subtree,
+     *    if(yes) {
+     *       replace the left node with the newnode's min and max based on comparison and then
+     *        call insert with left node as new root and newnode
+     *         insert(root.getLeft, newnode)
+     *     } else {
+     *       make the new node as left node and set left node and return
+     *     }
+     * */
+    if (root == null) {
+      root = newMinMaxNode;
+      return root;
+    }
+
+    if (compareNodesBasedOnMinMax(root, newMinMaxNode) == 0) {
+      root.addFilePats(newMinMaxNode.getFilePaths());
+      return root;
+    }
+
+    if (compareNodesBasedOnMinMax(root, newMinMaxNode) < 0) {
+      if (root.getRightSubTree() == null) {
+        root.setRightSubTree(newMinMaxNode);
+        root.setRightSubTreeMax(newMinMaxNode.getMax());
+        root.setRightSubTreeMin(newMinMaxNode.getMin());
+      } else {
+        if (compareMinMax(root.getRightSubTreeMax(), newMinMaxNode.getMax()) < 0) {
+          root.setRightSubTreeMax(newMinMaxNode.getMax());
+        }
+        if (compareMinMax(root.getRightSubTreeMin(), newMinMaxNode.getMin()) > 0) {
+          root.setRightSubTreeMin(newMinMaxNode.getMin());
+        }
+        insert(root.getRightSubTree(), newMinMaxNode);
+      }
+    } else {
+      if (root.getLeftSubTree() == null) {
+        root.setLeftSubTree(newMinMaxNode);
+        root.setLeftSubTreeMax(newMinMaxNode.getMax());
+        root.setLeftSubTreeMin(newMinMaxNode.getMin());
+      } else {
+        if (compareMinMax(root.getLeftSubTreeMax(), newMinMaxNode.getMax()) < 0) {
+          root.setLeftSubTreeMax(newMinMaxNode.getMax());
+        }
+        if (compareMinMax(root.getLeftSubTreeMin(), newMinMaxNode.getMin()) > 0) {
+          root.setLeftSubTreeMin(newMinMaxNode.getMin());
+        }
+        insert(root.getLeftSubTree(), newMinMaxNode);
+      }
+    }
+    return root;
+  }
+
+  private int compareNodesBasedOnMinMax(MinMaxNode root, MinMaxNode newMinMaxNode) {
+    int minCompare = compareMinMax(root.getMin(), newMinMaxNode.getMin());
+    int maxCompare = compareMinMax(root.getMax(), newMinMaxNode.getMax());
+    if (minCompare == 0) {
+      return maxCompare;
+    } else {
+      return minCompare;
+    }
+  }
+
+  private int compareMinMax(Object key1, Object key2) {
+    if (isDimensionColumn) {
+      if (isPrimitiveAndNotDate) {
+        return comparator.compare(key1, key2);
+      } else {
+        return ByteUtil.UnsafeComparer.INSTANCE

Review comment:
       why date also using unsafe byte array comparator?
   
   Also if string use unsafe comparator , if not, use provided comparator. can combine below else and above if. 

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
##########
@@ -207,19 +231,35 @@ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boo
 
   /**
    * Method to deserialize extended blocklet and input split for index server
-   * @param in
-   * @param locations
-   * @param tablePath
+   * @param in data input stream to read the primitives of extended blocklet
+   * @param locations locations of the input split
+   * @param tablePath carbon table path
    * @throws IOException
    */
   public void deserializeFields(DataInput in, String[] locations, String tablePath,
-      boolean isCountJob)
+      boolean isCountJob, CdcVO cdcVO)
       throws IOException {
     super.readFields(in);
     if (isCountJob) {
       count = in.readLong();
       segmentNo = in.readUTF();
       return;
+    } else if (cdcVO != null) {
+      filePath = in.readUTF();
+      this.columnToMinMaxMapping = new HashMap<>();
+      for (Map.Entry<String, Integer> entry : cdcVO.getColumnToIndexMap().entrySet()) {

Review comment:
       As you just need keys, please use `cdcVO.getColumnToIndexMap().keySet()`
   

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object UPSERTExample {
+
+  def main(args: Array[String]): Unit = {
+    val spark = ExampleUtils.createSparkSession("DataUPSERTExample")
+    performUPSERT(spark)
+  }
+
+  def performUPSERT(spark: SparkSession): Unit = {
+    spark.sql("drop table if exists target")
+    val initframe = spark.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = spark.read.format("carbondata").option("tableName", "target").load()
+    var cdc =
+      spark.createDataFrame(Seq(
+        Row("a", "7"),
+        Row("b", null),
+        Row("g", null),
+        Row("e", "3")
+      ).asJava,
+        StructType(Seq(StructField("key", StringType),
+          StructField("value", StringType))))
+    spark.sql("select * from target").show(false)
+    // upsert API updates a and b, inserts e and g
+    target.as("A").merge(cdc.as("B"), "key", "upsert").execute()

Review comment:
       Merge is supported by SQL also, you can cover that also in the testcase




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-888416589


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/4047/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
kunal642 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678023226



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
##########
@@ -286,6 +287,11 @@ object CarbonFilters {
         val (columnName, instance) = getGeoHashHandler(relation.carbonTable)
         Some(new PolygonRangeListExpression(children.head.toString(), children.last.toString(),
           columnName, instance))
+      case _: BlockPathsUDF =>
+        if (children.size > 1) {
+          throw new MalformedCarbonCommandException("Expect one string in polygon")

Review comment:
       exception message is not proper, please fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-887004294


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/4014/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670181335



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.mutate.FilePathMinMaxVO
+import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.comparator.SerializableComparator
+
+/**
+ * The utility class for Merge operations
+ */
+object CarbonMergeDataSetUtil {
+
+  /**
+   * This method reads the splits and make (blockPath, (min, max)) tuple to to min max pruning of
+   * the src dataset
+   *
+   * @param colTosplitsFilePathAndMinMaxMap   CarbonInputSplit whose min max cached in driver or
+   *                                          the index server
+   * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath and min max of all the
+   *                                          join columns involved
+   */
+  def addFilePathAndMinMaxTuples(
+      colTosplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]],
+      carbonTable: CarbonTable,
+      joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn, SerializableComparator],

Review comment:
       javadoc is not complete




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670182419



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT
+ */
+abstract class MergeHandler(sparkSession: SparkSession,
+    frame: DataFrame,
+    targetCarbonTable: CarbonTable,
+    stats: Stats,
+    srcDS: DataFrame) {
+
+  protected def performTagging: (RDD[Row], String) = {
+    val tupleId = frame.queryExecution.analyzed.output.zipWithIndex
+      .find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2
+    val schema =
+      org.apache.spark.sql.types.StructType(Seq(
+        StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, StringType)))
+    val job = CarbonSparkUtil.createHadoopJob()
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    val insertedRows = stats.insertedRows
+    val updatedRows = stats.updatedRows
+    val uuid = UUID.randomUUID.toString
+    job.setJobID(new JobID(uuid, 0))
+    val path = targetCarbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "avro"
+    FileOutputFormat.setOutputPath(job, new Path(path))
+    val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job, schema)
+    val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration)
+    frame.queryExecution.toRdd.mapPartitionsWithIndex { case (index, iterator) =>
+      val confB = config.value.value
+      val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
+      val attemptID = new TaskAttemptID(task, index)
+      val context = new TaskAttemptContextImpl(confB, attemptID)
+      val writer = factory.newInstance(path + CarbonCommonConstants.FILE_SEPARATOR + task.toString,
+        schema, context)
+      new Iterator[InternalRow] {
+        override def hasNext: Boolean = {
+          if (iterator.hasNext) {
+            true
+          } else {
+            writer.close()
+            false
+          }
+        }
+
+        override def next(): InternalRow = {
+          val row = iterator.next()
+          val newArray = new Array[Any](1)
+          val tupleID = row.getUTF8String(tupleId)
+          if (tupleID == null) {
+            insertedRows.add(1)
+          } else {
+            newArray(0) = tupleID
+            writer.write(new GenericInternalRow(newArray))
+            updatedRows.add(1)
+          }
+          null
+        }
+      }
+    }.count()
+    val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, path)
+    (deltaRdd, path)
+  }
+
+  protected def triggerAction(factTimestamp: Long,
+      executorErrors: ExecutionErrors,
+      deltaRdd: RDD[Row],
+      deltaPath: String): (util.List[SegmentUpdateDetails], Seq[Segment]) = {
+    val tuple = MergeUtil.triggerAction(sparkSession,
+      targetCarbonTable,
+      factTimestamp,
+      executorErrors,
+      deltaRdd)
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
+    MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable, factTimestamp, tuple)
+    tuple
+  }
+
+  protected def insertDataToTargetTable(updateTableModel: Option[UpdateTableModel]): Seq[Row] = {
+    val tableCols =
+      targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).
+        filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+    val header = tableCols.mkString(",")
+    val dataFrame = srcDS.select(tableCols.map(col): _*)
+    MergeUtil.insertDataToTargetTable(sparkSession,
+      targetCarbonTable,
+      header,
+      updateTableModel,
+      dataFrame)
+  }
+
+  protected def tryHorizontalCompaction(): Unit = {
+    // Do IUD Compaction.
+    HorizontalCompaction.tryHorizontalCompaction(
+      sparkSession, targetCarbonTable)
+  }
+
+  def handleMerge()
+}
+
+case class UpdateHandler(sparkSession: SparkSession,

Review comment:
       move param to next line, please follow the coding convention




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677287243



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala
##########
@@ -293,5 +293,10 @@ object CarbonSession {
     def merge(srcDS: Dataset[Row], expr: Column): MergeDataSetBuilder = {
       new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession)
     }
+
+    def merge(srcDS: Dataset[Row], keyColumn: String, operationType: String): UpsertBuilder = {

Review comment:
       ok, for the user exposed APIs, I will change to update, delete, upsert and insert. Internally we can call this merge method and make this private and keep other code same for better code handling. This is because internally we need to call the merge command class itself, so I thought we can do it this way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678064902



##########
File path: docs/configuration-parameters.md
##########
@@ -50,7 +50,8 @@ This section provides the details of all the configurations required for the Car
 | carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures which day of the week to be considered as first day of the week. Because first day of the week will be different in different parts of the world. |
 | carbon.enable.tablestatus.backup | false | In cloud object store scenario, overwriting table status file is not an atomic operation since it uses rename API. Thus, it is possible that table status is corrupted if process crashed when overwriting the table status file. To protect from file corruption, user can enable this property. |
 | carbon.trash.retention.days | 7 | This parameter specifies the number of days after which the timestamp based subdirectories are expired in the trash folder. Allowed Min value = 0, Allowed Max Value = 365 days|
-| carbon.clean.file.force.allowed | false | This paramter specifies if the clean files operation with force option is allowed or not.|
+| carbon.clean.file.force.allowed | false | This parameter specifies if the clean files operation with force option is allowed or not.|
+| carbon.cdc.minmax.pruning.enabled | false | This parameter defines whether the min max pruning to be performed on the target table based on the source data. Enable it when data is not sparse across target table and when pruning will be better based on use case.|

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
kunal642 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677995169



##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
##########
@@ -886,16 +888,7 @@ public String getTableTaskInfo(int index) {
     }
   }
 
-  private byte[][] getMinMaxValue(IndexRow row, int index) {
-    IndexRow minMaxRow = row.getRow(index);
-    byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
-    for (int i = 0; i < minMax.length; i++) {
-      minMax[i] = minMaxRow.getByteArray(i);
-    }
-    return minMax;
-  }
-
-  private boolean[] getMinMaxFlag(IndexRow row, int index) {
+  private static boolean[] getMinMaxFlag(IndexRow row, int index) {

Review comment:
       is static needed?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670690683



##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
##########
@@ -895,7 +895,7 @@ public String getTableTaskInfo(int index) {
     return minMax;
   }
 
-  private boolean[] getMinMaxFlag(IndexRow row, int index) {
+  public static boolean[] getMinMaxFlag(IndexRow row, int index) {

Review comment:
       reverted this as not required




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-880981669


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5717/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-889184408


   Build Success with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/202/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670182855



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeUtil.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
+import org.apache.spark.sql.execution.command.mutation.DeleteExecution
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, SegmentUpdateDetails}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+
+
+object MergeUtil {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def triggerAction(sparkSession: SparkSession,

Review comment:
       add javadoc for all functions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875753583


   Build Success with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/75/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678168297



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
##########
@@ -647,18 +647,41 @@ class CarbonScanRDD[T: ClassTag](
 
   private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
     CarbonInputFormat.setCarbonReadSupport(conf, readSupportClz)
-    val tableInfo1 = getTableInfo
-    CarbonInputFormat.setTableInfo(conf, tableInfo1)
+    val tableInfo = getTableInfo
+    CarbonInputFormat.setTableInfo(conf, tableInfo)
     if (indexFilter != null) {
-      indexFilter.setTable(CarbonTable.buildFromTableInfo(tableInfo1))
+      indexFilter.setTable(CarbonTable.buildFromTableInfo(tableInfo))
+      val children = indexFilter.getExpression.getChildren
+      children.asScala.zipWithIndex.foreach { case (child, index) =>
+        if (child.isInstanceOf[CDCBlockImplicitExpression]) {
+          indexFilter.getExpression.getChildren.set(index, new TrueExpression(null))
+          setCDCExpressionToTrue(indexFilter)

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678168115



##########
File path: core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
##########
@@ -102,6 +103,10 @@
 
   private Set<String> missingSISegments;
 
+  private CdcVO cdcVO;
+
+  private Boolean isCDCJob;

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
##########
@@ -886,16 +888,7 @@ public String getTableTaskInfo(int index) {
     }
   }
 
-  private byte[][] getMinMaxValue(IndexRow row, int index) {
-    IndexRow minMaxRow = row.getRow(index);
-    byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
-    for (int i = 0; i < minMax.length; i++) {
-      minMax[i] = minMaxRow.getByteArray(i);
-    }
-    return minMax;
-  }
-
-  private boolean[] getMinMaxFlag(IndexRow row, int index) {
+  private static boolean[] getMinMaxFlag(IndexRow row, int index) {

Review comment:
       reverted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875749641


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3927/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-887009881


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5755/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-874990865






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-881543571


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3978/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-876318004


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3932/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875047957


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3911/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-874733487


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3910/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-888194287


   @ajantha-bhat  i have updated doc also, please review 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670183936



##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
##########
@@ -780,6 +780,68 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       Seq(Row("c", "200"), Row("e", "100")))
   }
 
+  test("test new API") {

Review comment:
       only one testcase added? Is there more test for upsert api?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-889036976


   there is a conflict after merge #4177, resolving it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
kunal642 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677998587



##########
File path: docs/configuration-parameters.md
##########
@@ -50,7 +50,8 @@ This section provides the details of all the configurations required for the Car
 | carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures which day of the week to be considered as first day of the week. Because first day of the week will be different in different parts of the world. |
 | carbon.enable.tablestatus.backup | false | In cloud object store scenario, overwriting table status file is not an atomic operation since it uses rename API. Thus, it is possible that table status is corrupted if process crashed when overwriting the table status file. To protect from file corruption, user can enable this property. |
 | carbon.trash.retention.days | 7 | This parameter specifies the number of days after which the timestamp based subdirectories are expired in the trash folder. Allowed Min value = 0, Allowed Max Value = 365 days|
-| carbon.clean.file.force.allowed | false | This paramter specifies if the clean files operation with force option is allowed or not.|
+| carbon.clean.file.force.allowed | false | This parameter specifies if the clean files operation with force option is allowed or not.|
+| carbon.cdc.minmax.pruning.enabled | false | This parameter defines whether the min max pruning to be performed on the target table based on the source data. Enable it when data is not sparse across target table and when pruning will be better based on use case.|

Review comment:
       Instead of "Enable it ..." please change to "Usefull when.."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875906025


   Build Success with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/77/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670179355



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
##########
@@ -89,7 +89,7 @@ class DistributedIndexJob extends AbstractIndexJob {
         }
         client.getSplits(indexFormat)
           .getExtendedBlocklets(indexFormat.getCarbonTable.getTablePath, indexFormat
-            .getQueryId, indexFormat.isCountStarJob)
+            .getQueryId, indexFormat.isCountStarJob, null)

Review comment:
       Do not change this, but add a new `getExtendedBlocklets` function with one more param




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678093415



##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CdcVO.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * VO object which contains the info used in CDC case during cache loading in Index server
+ */
+public class CdcVO implements Serializable, Writable {
+
+  /**
+   * This collection contains column to index mapping which give info about the index for a column
+   * in IndexRow object to fetch min max
+   */
+  private Map<String, Integer> columnToIndexMap;
+
+  private List<Integer> indexesToFetch;

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-874729416


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5654/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-886629481


   @akashrn5 : I can see that this optimization is data-specific. But it would be useful if you can also attach a report of how much improvement was observed on top of which dataset. 
   Also, I have only reviewed at a high level as of now. Better to get it reviewed by multiple people. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
kunal642 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-888997685


   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
kunal642 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678009427



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -198,54 +420,38 @@ case class CarbonMergeDataSetCommand(
       val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, deltaPath)
       val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
       FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
-      if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1.asScala.asJava,
-        carbonTable,
-        trxMgr.getLatestTrx.toString, false, false)) {
-        LOGGER.error("writing of update status file failed")
-        throw new CarbonMergeDataSetException("writing of update status file failed")
-      }
+      MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable,
+        trxMgr.getLatestTrx, tuple)
       Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx,
         executorErrors, tuple._2, Option.empty))
     } else {
       None
     }
 
     val dataFrame = loadDF.select(tableCols.map(col): _*)
-    CarbonInsertIntoCommand(databaseNameOp = Some(carbonTable.getDatabaseName),
-      tableName = carbonTable.getTableName,
-      options = Map("fileheader" -> header),
-      isOverwriteTable = false,
-      dataFrame.queryExecution.logical,
-      carbonTable.getTableInfo,
-      Map.empty,
-      Map.empty,
-      new OperationContext,
-      updateTableModel
-    ).run(sparkSession)
+    MergeUtil.insertDataToTargetTable(sparkSession,
+      targetCarbonTable,
+      header,
+      updateTableModel,
+      dataFrame)
 
     if (hasDelAction && count == 0) {
-      val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
-        .getTableStatusFilePath(carbonTable.getTablePath))
-      CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail =>
-        new Segment(loadMetadataDetail.getMergedLoadName,
-          loadMetadataDetail.getSegmentFile)).toSet.asJava,
-        carbonTable,
-        trxMgr.getLatestTrx.toString,
-        true,
-        true, new util.ArrayList[Segment]())
+      MergeUtil.updateStatusIfJustDeleteOperation(targetCarbonTable, trxMgr.getLatestTrx)
     }
     LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
     LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
     LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
     LOGGER.info(
       " Time taken to merge data  :: " + (System.currentTimeMillis() - st))
 
-  // Load the history table if the insert history table action is added by user.
-    HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head, carbonTable,
+    // Load the history table if the insert history table action is added by user.

Review comment:
       check if style is wrong




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670176241



##########
File path: docs/configuration-parameters.md
##########
@@ -50,7 +50,8 @@ This section provides the details of all the configurations required for the Car
 | carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures which day of the week to be considered as first day of the week. Because first day of the week will be different in different parts of the world. |
 | carbon.enable.tablestatus.backup | false | In cloud object store scenario, overwriting table status file is not an atomic operation since it uses rename API. Thus, it is possible that table status is corrupted if process crashed when overwriting the table status file. To protect from file corruption, user can enable this property. |
 | carbon.trash.retention.days | 7 | This parameter specifies the number of days after which the timestamp based subdirectories are expired in the trash folder. Allowed Min value = 0, Allowed Max Value = 365 days|
-| carbon.clean.file.force.allowed | false | This paramter specifies if the clean files operation with force option is allowed or not.|
+| carbon.clean.file.force.allowed | false | This parameter specifies if the clean files operation with force option is allowed or not.|
+| carbon.cdc.minmax.pruning.enabled | false | This parameter defines whether the min max pruning to be performed on the target table based on the source data. enable it when data is not sparsed across target table and when pruning will be better based on use case.|

Review comment:
       ```suggestion
   | carbon.cdc.minmax.pruning.enabled | false | This parameter defines whether the min max pruning to be performed on the target table based on the source data. Enable it when data is not sparse across target table and when pruning will be better based on use case.|
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875509851


   Build Failed  with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/68/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875515888


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3920/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-880417651






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875607064


   Build Failed  with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/73/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-880988012


   Build Failed  with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/121/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670178754



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
##########
@@ -427,6 +427,6 @@ object DistributedRDDUtils {
       new java.util.ArrayList(),
       new java.util.ArrayList())
     new ExtendedBlockletWrapper(blocklets, request.getCarbonTable.getTablePath, request.getQueryId,
-      request.isWriteToFile, request.isCountStarJob)
+      request.isWriteToFile, request.isCountStarJob, request.getCdcVO)

Review comment:
       Refactor constructor of ExtendedBlockletWrapper to pass request directly, so that do not need to change the constructor everytime a new parameter is added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] brijoobopanna commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
brijoobopanna commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-881433563


   retest this please
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-880417420


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5714/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ravipesala commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
ravipesala commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677079561



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala
##########
@@ -293,5 +293,10 @@ object CarbonSession {
     def merge(srcDS: Dataset[Row], expr: Column): MergeDataSetBuilder = {
       new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession)
     }
+
+    def merge(srcDS: Dataset[Row], keyColumn: String, operationType: String): UpsertBuilder = {

Review comment:
       No need use merge method name and pass the operationType, you can directly name the method names as upsert, insert and delete. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670176448



##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object DataUPSERTExample {

Review comment:
       Rename to UpsertExample




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677593241



##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object UPSERTExample {
+
+  def main(args: Array[String]): Unit = {
+    val spark = ExampleUtils.createSparkSession("DataUPSERTExample")
+    performUPSERT(spark)
+  }
+
+  def performUPSERT(spark: SparkSession): Unit = {
+    spark.sql("drop table if exists target")
+    val initframe = spark.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = spark.read.format("carbondata").option("tableName", "target").load()
+    var cdc =
+      spark.createDataFrame(Seq(
+        Row("a", "7"),
+        Row("b", null),
+        Row("g", null),
+        Row("e", "3")
+      ).asJava,
+        StructType(Seq(StructField("key", StringType),
+          StructField("value", StringType))))
+    spark.sql("select * from target").show(false)
+    // upsert API updates a and b, inserts e and g
+    target.as("A").merge(cdc.as("B"), "key", "upsert").execute()

Review comment:
       Please update the document about new API and also when to use new and when to use old. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ravipesala commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
ravipesala commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-887172274


   @akashrn5  LGTM from high level.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670181749



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.mutate.FilePathMinMaxVO
+import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.comparator.SerializableComparator
+
+/**
+ * The utility class for Merge operations
+ */
+object CarbonMergeDataSetUtil {
+
+  /**
+   * This method reads the splits and make (blockPath, (min, max)) tuple to to min max pruning of
+   * the src dataset
+   *
+   * @param colTosplitsFilePathAndMinMaxMap   CarbonInputSplit whose min max cached in driver or
+   *                                          the index server
+   * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath and min max of all the
+   *                                          join columns involved
+   */
+  def addFilePathAndMinMaxTuples(
+      colTosplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]],
+      carbonTable: CarbonTable,
+      joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn, SerializableComparator],
+      fileMinMaxMapListOfAllJoinColumns: mutable.ArrayBuffer[(mutable.Map[String, (AnyRef, AnyRef)],
+        CarbonColumn)]): Unit = {
+    joinColumnsToComparatorMap.foreach { case (joinColumn, comparator) =>
+      val fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)] =
+        collection.mutable.Map.empty[String, (AnyRef, AnyRef)]
+      val joinDataType = joinColumn.getDataType
+      val isDimension = joinColumn.isDimension
+      val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) &&
+                                  (joinDataType != DataTypes.DATE)
+      colTosplitsFilePathAndMinMaxMap(joinColumn.getColName).asScala.foreach {
+        filePathMinMiax =>
+          val filePath = filePathMinMiax.getFilePath
+          val minBytes = filePathMinMiax.getMin
+          val maxBytes = filePathMinMiax.getMax
+          val uniqBlockPath = if (carbonTable.isHivePartitionTable) {
+            // While data loading to SI created on Partition table, on
+            // partition directory, '/' will be
+            // replaced with '#', to support multi level partitioning. For example, BlockId will be
+            // look like `part1=1#part2=2/xxxxxxxxx`. During query also, blockId should be
+            // replaced by '#' in place of '/', to match and prune data on SI table.
+            CarbonUtil.getBlockId(carbonTable.getAbsoluteTableIdentifier,
+              filePath,
+              "",
+              true,
+              false,
+              true)
+          } else {
+            filePath.substring(filePath.lastIndexOf("/Part") + 1)
+          }
+          if (isDimension) {
+            if (isPrimitiveAndNotDate) {
+              val minValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minBytes,
+                joinDataType)
+              val maxValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxBytes,
+                joinDataType)
+              // check here if present in map, if it is, compare and update min and amx
+              if (fileMinMaxMap.contains(uniqBlockPath)) {
+                val isMinLessThanMin =
+                  comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0
+                val isMaxMoreThanMax =
+                  comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0
+                updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                  minValue,
+                  maxValue,
+                  uniqBlockPath,
+                  isMinLessThanMin,
+                  isMaxMoreThanMax)
+              } else {
+                fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+              }
+            } else {
+              if (fileMinMaxMap.contains(uniqBlockPath)) {
+                val isMinLessThanMin = ByteUtil.UnsafeComparer.INSTANCE
+                                         .compareTo(fileMinMaxMap(uniqBlockPath)._1
+                                           .asInstanceOf[String].getBytes(), minBytes) > 0
+                val isMaxMoreThanMax = ByteUtil.UnsafeComparer.INSTANCE
+                                         .compareTo(maxBytes, fileMinMaxMap(uniqBlockPath)._2
+                                           .asInstanceOf[String].getBytes()) > 0
+                updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                  new String(minBytes),
+                  new String(maxBytes),
+                  uniqBlockPath,
+                  isMinLessThanMin,
+                  isMaxMoreThanMax)
+              } else {
+                fileMinMaxMap += (uniqBlockPath -> (new String(minBytes), new String(maxBytes)))
+              }
+            }
+          } else {
+            val maxValue = DataTypeUtil.getMeasureObjectFromDataType(maxBytes, joinDataType)
+            val minValue = DataTypeUtil.getMeasureObjectFromDataType(minBytes, joinDataType)
+            if (fileMinMaxMap.contains(uniqBlockPath)) {
+              val isMinLessThanMin =
+                comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0
+              val isMaxMoreThanMin =
+                comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0
+              updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                minValue,
+                maxValue,
+                uniqBlockPath,
+                isMinLessThanMin,
+                isMaxMoreThanMin)
+            } else {
+              fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+            }
+          }
+      }
+      fileMinMaxMapListOfAllJoinColumns += ((fileMinMaxMap, joinColumn))
+    }
+  }
+
+  /**
+   * This method updates the min max map of the block if the value is less than min or more
+   * than max
+   */
+  private def updateMapIfRequiredBasedOnMinMax(fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)],
+      minValue: AnyRef,
+      maxValue: AnyRef,
+      uniqBlockPath: String,
+      isMinLessThanMin: Boolean,
+      isMaxMoreThanMin: Boolean): Unit = {
+    (isMinLessThanMin, isMaxMoreThanMin) match {
+      case (true, true) => fileMinMaxMap(uniqBlockPath) = (minValue, maxValue)
+      case (true, false) => fileMinMaxMap(uniqBlockPath) = (minValue,
+        fileMinMaxMap(uniqBlockPath)._2)
+      case (false, true) => fileMinMaxMap(uniqBlockPath) = (fileMinMaxMap(uniqBlockPath)._1,
+        maxValue)
+      case _ =>
+    }
+  }
+
+  /**
+   * This method returns the partitions required to scan in the target table based on the
+   * partitions present in the src table or dataset
+   */
+  def getPartitionSpecToConsiderForPruning(sparkSession: SparkSession,

Review comment:
       move param to next line, please follow the coding convension




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-876316068


   Build Success with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/80/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875899662


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5673/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875508254


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5664/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670174604



##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
##########
@@ -886,7 +886,7 @@ public String getTableTaskInfo(int index) {
     }
   }
 
-  private byte[][] getMinMaxValue(IndexRow row, int index) {
+  public static byte[][] getMinMaxValue(IndexRow row, int index) {

Review comment:
       Move it to a utility class

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
##########
@@ -895,7 +895,7 @@ public String getTableTaskInfo(int index) {
     return minMax;
   }
 
-  private boolean[] getMinMaxFlag(IndexRow row, int index) {
+  public static boolean[] getMinMaxFlag(IndexRow row, int index) {

Review comment:
       Move it to a utility class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670689780



##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
##########
@@ -168,7 +175,7 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
    * @throws IOException
    */
   public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob,
-      boolean isExternalPath)
+      boolean isExternalPath, CdcVO cdcVO)

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
##########
@@ -886,7 +886,7 @@ public String getTableTaskInfo(int index) {
     }
   }
 
-  private byte[][] getMinMaxValue(IndexRow row, int index) {
+  public static byte[][] getMinMaxValue(IndexRow row, int index) {

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
##########
@@ -895,7 +895,7 @@ public String getTableTaskInfo(int index) {
     return minMax;
   }
 
-  private boolean[] getMinMaxFlag(IndexRow row, int index) {
+  public static boolean[] getMinMaxFlag(IndexRow row, int index) {

Review comment:
       removed this change, as it was not required




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-880980079


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3975/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-857491553


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5528/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670181583



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.mutate.FilePathMinMaxVO
+import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.comparator.SerializableComparator
+
+/**
+ * The utility class for Merge operations
+ */
+object CarbonMergeDataSetUtil {
+
+  /**
+   * This method reads the splits and make (blockPath, (min, max)) tuple to to min max pruning of
+   * the src dataset
+   *
+   * @param colTosplitsFilePathAndMinMaxMap   CarbonInputSplit whose min max cached in driver or
+   *                                          the index server
+   * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath and min max of all the
+   *                                          join columns involved
+   */
+  def addFilePathAndMinMaxTuples(
+      colTosplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]],
+      carbonTable: CarbonTable,
+      joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn, SerializableComparator],
+      fileMinMaxMapListOfAllJoinColumns: mutable.ArrayBuffer[(mutable.Map[String, (AnyRef, AnyRef)],
+        CarbonColumn)]): Unit = {
+    joinColumnsToComparatorMap.foreach { case (joinColumn, comparator) =>
+      val fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)] =
+        collection.mutable.Map.empty[String, (AnyRef, AnyRef)]
+      val joinDataType = joinColumn.getDataType
+      val isDimension = joinColumn.isDimension
+      val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) &&
+                                  (joinDataType != DataTypes.DATE)
+      colTosplitsFilePathAndMinMaxMap(joinColumn.getColName).asScala.foreach {
+        filePathMinMiax =>
+          val filePath = filePathMinMiax.getFilePath
+          val minBytes = filePathMinMiax.getMin
+          val maxBytes = filePathMinMiax.getMax
+          val uniqBlockPath = if (carbonTable.isHivePartitionTable) {
+            // While data loading to SI created on Partition table, on
+            // partition directory, '/' will be
+            // replaced with '#', to support multi level partitioning. For example, BlockId will be
+            // look like `part1=1#part2=2/xxxxxxxxx`. During query also, blockId should be
+            // replaced by '#' in place of '/', to match and prune data on SI table.
+            CarbonUtil.getBlockId(carbonTable.getAbsoluteTableIdentifier,
+              filePath,
+              "",
+              true,
+              false,
+              true)
+          } else {
+            filePath.substring(filePath.lastIndexOf("/Part") + 1)
+          }
+          if (isDimension) {
+            if (isPrimitiveAndNotDate) {
+              val minValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minBytes,
+                joinDataType)
+              val maxValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxBytes,
+                joinDataType)
+              // check here if present in map, if it is, compare and update min and amx
+              if (fileMinMaxMap.contains(uniqBlockPath)) {
+                val isMinLessThanMin =
+                  comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0
+                val isMaxMoreThanMax =
+                  comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0
+                updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                  minValue,
+                  maxValue,
+                  uniqBlockPath,
+                  isMinLessThanMin,
+                  isMaxMoreThanMax)
+              } else {
+                fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+              }
+            } else {
+              if (fileMinMaxMap.contains(uniqBlockPath)) {
+                val isMinLessThanMin = ByteUtil.UnsafeComparer.INSTANCE
+                                         .compareTo(fileMinMaxMap(uniqBlockPath)._1
+                                           .asInstanceOf[String].getBytes(), minBytes) > 0
+                val isMaxMoreThanMax = ByteUtil.UnsafeComparer.INSTANCE
+                                         .compareTo(maxBytes, fileMinMaxMap(uniqBlockPath)._2
+                                           .asInstanceOf[String].getBytes()) > 0
+                updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                  new String(minBytes),
+                  new String(maxBytes),
+                  uniqBlockPath,
+                  isMinLessThanMin,
+                  isMaxMoreThanMax)
+              } else {
+                fileMinMaxMap += (uniqBlockPath -> (new String(minBytes), new String(maxBytes)))
+              }
+            }
+          } else {
+            val maxValue = DataTypeUtil.getMeasureObjectFromDataType(maxBytes, joinDataType)
+            val minValue = DataTypeUtil.getMeasureObjectFromDataType(minBytes, joinDataType)
+            if (fileMinMaxMap.contains(uniqBlockPath)) {
+              val isMinLessThanMin =
+                comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0
+              val isMaxMoreThanMin =
+                comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0
+              updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                minValue,
+                maxValue,
+                uniqBlockPath,
+                isMinLessThanMin,
+                isMaxMoreThanMin)
+            } else {
+              fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+            }
+          }
+      }
+      fileMinMaxMapListOfAllJoinColumns += ((fileMinMaxMap, joinColumn))
+    }
+  }
+
+  /**
+   * This method updates the min max map of the block if the value is less than min or more
+   * than max
+   */
+  private def updateMapIfRequiredBasedOnMinMax(fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)],

Review comment:
       move param to next line, please follow the coding convension




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670180960



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
##########
@@ -81,6 +82,7 @@ class CarbonEnv {
 
     sparkSession.udf.register("getTupleId", () => "")
     sparkSession.udf.register("getPositionId", () => "")
+    sparkSession.udf.register("block_paths", new BlockPathsUDF)

Review comment:
       please follow same naming convention




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875697930


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5671/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-888425227


   Build Failed  with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/195/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-857675115


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3788/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-888422370


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5790/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-875902635


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3929/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ravipesala commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
ravipesala commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677079561



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala
##########
@@ -293,5 +293,10 @@ object CarbonSession {
     def merge(srcDS: Dataset[Row], expr: Column): MergeDataSetBuilder = {
       new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession)
     }
+
+    def merge(srcDS: Dataset[Row], keyColumn: String, operationType: String): UpsertBuilder = {

Review comment:
       No need use merge method name and pass the operationType, you can directly name the method as 'upsert'




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-876307725


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5676/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-874735006


   Build Failed  with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/57/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-857671694


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5531/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
kunal642 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677995951



##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CdcVO.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * VO object which contains the info used in CDC case during cache loading in Index server
+ */
+public class CdcVO implements Serializable, Writable {
+
+  /**
+   * This collection contains column to index mapping which give info about the index for a column
+   * in IndexRow object to fetch min max
+   */
+  private Map<String, Integer> columnToIndexMap;
+
+  private List<Integer> indexesToFetch;

Review comment:
       Can you add docs for this variable, it is not clear from the name




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-881542489


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5720/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
kunal642 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678001083



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
##########
@@ -647,18 +647,41 @@ class CarbonScanRDD[T: ClassTag](
 
   private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
     CarbonInputFormat.setCarbonReadSupport(conf, readSupportClz)
-    val tableInfo1 = getTableInfo
-    CarbonInputFormat.setTableInfo(conf, tableInfo1)
+    val tableInfo = getTableInfo
+    CarbonInputFormat.setTableInfo(conf, tableInfo)
     if (indexFilter != null) {
-      indexFilter.setTable(CarbonTable.buildFromTableInfo(tableInfo1))
+      indexFilter.setTable(CarbonTable.buildFromTableInfo(tableInfo))
+      val children = indexFilter.getExpression.getChildren
+      children.asScala.zipWithIndex.foreach { case (child, index) =>
+        if (child.isInstanceOf[CDCBlockImplicitExpression]) {
+          indexFilter.getExpression.getChildren.set(index, new TrueExpression(null))
+          setCDCExpressionToTrue(indexFilter)

Review comment:
       please add a comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] asfgit closed pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-887595447


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5773/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677125608



##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object UPSERTExample {
+
+  def main(args: Array[String]): Unit = {
+    val spark = ExampleUtils.createSparkSession("DataUPSERTExample")
+    performUPSERT(spark)
+  }
+
+  def performUPSERT(spark: SparkSession): Unit = {
+    spark.sql("drop table if exists target")
+    val initframe = spark.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = spark.read.format("carbondata").option("tableName", "target").load()
+    var cdc =
+      spark.createDataFrame(Seq(
+        Row("a", "7"),
+        Row("b", null),
+        Row("g", null),
+        Row("e", "3")
+      ).asJava,
+        StructType(Seq(StructField("key", StringType),
+          StructField("value", StringType))))
+    spark.sql("select * from target").show(false)
+    // upsert API updates a and b, inserts e and g
+    target.as("A").merge(cdc.as("B"), "key", "upsert").execute()

Review comment:
       no old APIs are required to support actual cdc operations with partial column updates and deletes and inserts. These are to support additional simple upsert operation scenarios




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670175358



##########
File path: core/src/main/java/org/apache/carbondata/core/range/BlockMinMaxTree.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.range;
+
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET;
+
+/**
+ * This class prepares a tree for pruning using min-max of block
+ */
+public class BlockMinMaxTree implements Serializable {
+
+  private MinMaxNode root;
+
+  private final boolean isPrimitiveAndNotDate;
+  private final boolean isDimensionColumn;
+  private final DataType joinDataType;
+  private final SerializableComparator comparator;
+
+  public BlockMinMaxTree(boolean isPrimitiveAndNotDate, boolean isDimensionColumn,
+      DataType joinDataType, SerializableComparator comparator) {
+    this.isPrimitiveAndNotDate = isPrimitiveAndNotDate;
+    this.isDimensionColumn = isDimensionColumn;
+    this.joinDataType = joinDataType;
+    this.comparator = comparator;
+  }
+
+  public MinMaxNode getRoot() {
+    return root;
+  }
+
+  public void insert(MinMaxNode newMinMaxNode) {
+    root = insert(getRoot(), newMinMaxNode);
+  }
+
+  MinMaxNode insert(MinMaxNode root, MinMaxNode newMinMaxNode) {

Review comment:
       Why is it non-public? Should it be private?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670182419



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT
+ */
+abstract class MergeHandler(sparkSession: SparkSession,
+    frame: DataFrame,
+    targetCarbonTable: CarbonTable,
+    stats: Stats,
+    srcDS: DataFrame) {
+
+  protected def performTagging: (RDD[Row], String) = {
+    val tupleId = frame.queryExecution.analyzed.output.zipWithIndex
+      .find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2
+    val schema =
+      org.apache.spark.sql.types.StructType(Seq(
+        StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, StringType)))
+    val job = CarbonSparkUtil.createHadoopJob()
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    val insertedRows = stats.insertedRows
+    val updatedRows = stats.updatedRows
+    val uuid = UUID.randomUUID.toString
+    job.setJobID(new JobID(uuid, 0))
+    val path = targetCarbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "avro"
+    FileOutputFormat.setOutputPath(job, new Path(path))
+    val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job, schema)
+    val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration)
+    frame.queryExecution.toRdd.mapPartitionsWithIndex { case (index, iterator) =>
+      val confB = config.value.value
+      val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
+      val attemptID = new TaskAttemptID(task, index)
+      val context = new TaskAttemptContextImpl(confB, attemptID)
+      val writer = factory.newInstance(path + CarbonCommonConstants.FILE_SEPARATOR + task.toString,
+        schema, context)
+      new Iterator[InternalRow] {
+        override def hasNext: Boolean = {
+          if (iterator.hasNext) {
+            true
+          } else {
+            writer.close()
+            false
+          }
+        }
+
+        override def next(): InternalRow = {
+          val row = iterator.next()
+          val newArray = new Array[Any](1)
+          val tupleID = row.getUTF8String(tupleId)
+          if (tupleID == null) {
+            insertedRows.add(1)
+          } else {
+            newArray(0) = tupleID
+            writer.write(new GenericInternalRow(newArray))
+            updatedRows.add(1)
+          }
+          null
+        }
+      }
+    }.count()
+    val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, path)
+    (deltaRdd, path)
+  }
+
+  protected def triggerAction(factTimestamp: Long,
+      executorErrors: ExecutionErrors,
+      deltaRdd: RDD[Row],
+      deltaPath: String): (util.List[SegmentUpdateDetails], Seq[Segment]) = {
+    val tuple = MergeUtil.triggerAction(sparkSession,
+      targetCarbonTable,
+      factTimestamp,
+      executorErrors,
+      deltaRdd)
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
+    MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable, factTimestamp, tuple)
+    tuple
+  }
+
+  protected def insertDataToTargetTable(updateTableModel: Option[UpdateTableModel]): Seq[Row] = {
+    val tableCols =
+      targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).
+        filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+    val header = tableCols.mkString(",")
+    val dataFrame = srcDS.select(tableCols.map(col): _*)
+    MergeUtil.insertDataToTargetTable(sparkSession,
+      targetCarbonTable,
+      header,
+      updateTableModel,
+      dataFrame)
+  }
+
+  protected def tryHorizontalCompaction(): Unit = {
+    // Do IUD Compaction.
+    HorizontalCompaction.tryHorizontalCompaction(
+      sparkSession, targetCarbonTable)
+  }
+
+  def handleMerge()
+}
+
+case class UpdateHandler(sparkSession: SparkSession,

Review comment:
       move param to next line, please follow the coding convention
   please change all places




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-881543130


   Build Failed  with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/124/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670689968



##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
##########
@@ -780,6 +780,68 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       Seq(Row("c", "200"), Row("e", "100")))
   }
 
+  test("test new API") {

Review comment:
       updated and added test cases

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeUtil.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
+import org.apache.spark.sql.execution.command.mutation.DeleteExecution
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, SegmentUpdateDetails}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+
+
+object MergeUtil {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def triggerAction(sparkSession: SparkSession,

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT
+ */
+abstract class MergeHandler(sparkSession: SparkSession,
+    frame: DataFrame,
+    targetCarbonTable: CarbonTable,
+    stats: Stats,
+    srcDS: DataFrame) {
+
+  protected def performTagging: (RDD[Row], String) = {
+    val tupleId = frame.queryExecution.analyzed.output.zipWithIndex
+      .find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2
+    val schema =
+      org.apache.spark.sql.types.StructType(Seq(
+        StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, StringType)))
+    val job = CarbonSparkUtil.createHadoopJob()
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    val insertedRows = stats.insertedRows
+    val updatedRows = stats.updatedRows
+    val uuid = UUID.randomUUID.toString
+    job.setJobID(new JobID(uuid, 0))
+    val path = targetCarbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "avro"
+    FileOutputFormat.setOutputPath(job, new Path(path))
+    val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job, schema)
+    val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration)
+    frame.queryExecution.toRdd.mapPartitionsWithIndex { case (index, iterator) =>
+      val confB = config.value.value
+      val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
+      val attemptID = new TaskAttemptID(task, index)
+      val context = new TaskAttemptContextImpl(confB, attemptID)
+      val writer = factory.newInstance(path + CarbonCommonConstants.FILE_SEPARATOR + task.toString,
+        schema, context)
+      new Iterator[InternalRow] {
+        override def hasNext: Boolean = {
+          if (iterator.hasNext) {
+            true
+          } else {
+            writer.close()
+            false
+          }
+        }
+
+        override def next(): InternalRow = {
+          val row = iterator.next()
+          val newArray = new Array[Any](1)
+          val tupleID = row.getUTF8String(tupleId)
+          if (tupleID == null) {
+            insertedRows.add(1)
+          } else {
+            newArray(0) = tupleID
+            writer.write(new GenericInternalRow(newArray))
+            updatedRows.add(1)
+          }
+          null
+        }
+      }
+    }.count()
+    val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, path)
+    (deltaRdd, path)
+  }
+
+  protected def triggerAction(factTimestamp: Long,
+      executorErrors: ExecutionErrors,
+      deltaRdd: RDD[Row],
+      deltaPath: String): (util.List[SegmentUpdateDetails], Seq[Segment]) = {
+    val tuple = MergeUtil.triggerAction(sparkSession,
+      targetCarbonTable,
+      factTimestamp,
+      executorErrors,
+      deltaRdd)
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
+    MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable, factTimestamp, tuple)
+    tuple
+  }
+
+  protected def insertDataToTargetTable(updateTableModel: Option[UpdateTableModel]): Seq[Row] = {
+    val tableCols =
+      targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).
+        filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+    val header = tableCols.mkString(",")
+    val dataFrame = srcDS.select(tableCols.map(col): _*)
+    MergeUtil.insertDataToTargetTable(sparkSession,
+      targetCarbonTable,
+      header,
+      updateTableModel,
+      dataFrame)
+  }
+
+  protected def tryHorizontalCompaction(): Unit = {
+    // Do IUD Compaction.
+    HorizontalCompaction.tryHorizontalCompaction(
+      sparkSession, targetCarbonTable)
+  }
+
+  def handleMerge()
+}
+
+case class UpdateHandler(sparkSession: SparkSession,

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT
+ */
+abstract class MergeHandler(sparkSession: SparkSession,

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.mutate.FilePathMinMaxVO
+import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.comparator.SerializableComparator
+
+/**
+ * The utility class for Merge operations
+ */
+object CarbonMergeDataSetUtil {
+
+  /**
+   * This method reads the splits and make (blockPath, (min, max)) tuple to to min max pruning of
+   * the src dataset
+   *
+   * @param colTosplitsFilePathAndMinMaxMap   CarbonInputSplit whose min max cached in driver or
+   *                                          the index server
+   * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath and min max of all the
+   *                                          join columns involved
+   */
+  def addFilePathAndMinMaxTuples(
+      colTosplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]],
+      carbonTable: CarbonTable,
+      joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn, SerializableComparator],
+      fileMinMaxMapListOfAllJoinColumns: mutable.ArrayBuffer[(mutable.Map[String, (AnyRef, AnyRef)],
+        CarbonColumn)]): Unit = {
+    joinColumnsToComparatorMap.foreach { case (joinColumn, comparator) =>
+      val fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)] =
+        collection.mutable.Map.empty[String, (AnyRef, AnyRef)]
+      val joinDataType = joinColumn.getDataType
+      val isDimension = joinColumn.isDimension
+      val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) &&
+                                  (joinDataType != DataTypes.DATE)
+      colTosplitsFilePathAndMinMaxMap(joinColumn.getColName).asScala.foreach {
+        filePathMinMiax =>
+          val filePath = filePathMinMiax.getFilePath
+          val minBytes = filePathMinMiax.getMin
+          val maxBytes = filePathMinMiax.getMax
+          val uniqBlockPath = if (carbonTable.isHivePartitionTable) {
+            // While data loading to SI created on Partition table, on
+            // partition directory, '/' will be
+            // replaced with '#', to support multi level partitioning. For example, BlockId will be
+            // look like `part1=1#part2=2/xxxxxxxxx`. During query also, blockId should be
+            // replaced by '#' in place of '/', to match and prune data on SI table.
+            CarbonUtil.getBlockId(carbonTable.getAbsoluteTableIdentifier,
+              filePath,
+              "",
+              true,
+              false,
+              true)
+          } else {
+            filePath.substring(filePath.lastIndexOf("/Part") + 1)
+          }
+          if (isDimension) {
+            if (isPrimitiveAndNotDate) {
+              val minValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minBytes,
+                joinDataType)
+              val maxValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxBytes,
+                joinDataType)
+              // check here if present in map, if it is, compare and update min and amx
+              if (fileMinMaxMap.contains(uniqBlockPath)) {
+                val isMinLessThanMin =
+                  comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0
+                val isMaxMoreThanMax =
+                  comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0
+                updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                  minValue,
+                  maxValue,
+                  uniqBlockPath,
+                  isMinLessThanMin,
+                  isMaxMoreThanMax)
+              } else {
+                fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+              }
+            } else {
+              if (fileMinMaxMap.contains(uniqBlockPath)) {
+                val isMinLessThanMin = ByteUtil.UnsafeComparer.INSTANCE
+                                         .compareTo(fileMinMaxMap(uniqBlockPath)._1
+                                           .asInstanceOf[String].getBytes(), minBytes) > 0
+                val isMaxMoreThanMax = ByteUtil.UnsafeComparer.INSTANCE
+                                         .compareTo(maxBytes, fileMinMaxMap(uniqBlockPath)._2
+                                           .asInstanceOf[String].getBytes()) > 0
+                updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                  new String(minBytes),
+                  new String(maxBytes),
+                  uniqBlockPath,
+                  isMinLessThanMin,
+                  isMaxMoreThanMax)
+              } else {
+                fileMinMaxMap += (uniqBlockPath -> (new String(minBytes), new String(maxBytes)))
+              }
+            }
+          } else {
+            val maxValue = DataTypeUtil.getMeasureObjectFromDataType(maxBytes, joinDataType)
+            val minValue = DataTypeUtil.getMeasureObjectFromDataType(minBytes, joinDataType)
+            if (fileMinMaxMap.contains(uniqBlockPath)) {
+              val isMinLessThanMin =
+                comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0
+              val isMaxMoreThanMin =
+                comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0
+              updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                minValue,
+                maxValue,
+                uniqBlockPath,
+                isMinLessThanMin,
+                isMaxMoreThanMin)
+            } else {
+              fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+            }
+          }
+      }
+      fileMinMaxMapListOfAllJoinColumns += ((fileMinMaxMap, joinColumn))
+    }
+  }
+
+  /**
+   * This method updates the min max map of the block if the value is less than min or more
+   * than max
+   */
+  private def updateMapIfRequiredBasedOnMinMax(fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)],
+      minValue: AnyRef,
+      maxValue: AnyRef,
+      uniqBlockPath: String,
+      isMinLessThanMin: Boolean,
+      isMaxMoreThanMin: Boolean): Unit = {
+    (isMinLessThanMin, isMaxMoreThanMin) match {
+      case (true, true) => fileMinMaxMap(uniqBlockPath) = (minValue, maxValue)
+      case (true, false) => fileMinMaxMap(uniqBlockPath) = (minValue,
+        fileMinMaxMap(uniqBlockPath)._2)
+      case (false, true) => fileMinMaxMap(uniqBlockPath) = (fileMinMaxMap(uniqBlockPath)._1,
+        maxValue)
+      case _ =>
+    }
+  }
+
+  /**
+   * This method returns the partitions required to scan in the target table based on the
+   * partitions present in the src table or dataset
+   */
+  def getPartitionSpecToConsiderForPruning(sparkSession: SparkSession,

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.mutate.FilePathMinMaxVO
+import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.comparator.SerializableComparator
+
+/**
+ * The utility class for Merge operations
+ */
+object CarbonMergeDataSetUtil {
+
+  /**
+   * This method reads the splits and make (blockPath, (min, max)) tuple to to min max pruning of
+   * the src dataset
+   *
+   * @param colTosplitsFilePathAndMinMaxMap   CarbonInputSplit whose min max cached in driver or
+   *                                          the index server
+   * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath and min max of all the
+   *                                          join columns involved
+   */
+  def addFilePathAndMinMaxTuples(
+      colTosplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]],
+      carbonTable: CarbonTable,
+      joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn, SerializableComparator],
+      fileMinMaxMapListOfAllJoinColumns: mutable.ArrayBuffer[(mutable.Map[String, (AnyRef, AnyRef)],
+        CarbonColumn)]): Unit = {
+    joinColumnsToComparatorMap.foreach { case (joinColumn, comparator) =>
+      val fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)] =
+        collection.mutable.Map.empty[String, (AnyRef, AnyRef)]
+      val joinDataType = joinColumn.getDataType
+      val isDimension = joinColumn.isDimension
+      val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType) &&
+                                  (joinDataType != DataTypes.DATE)
+      colTosplitsFilePathAndMinMaxMap(joinColumn.getColName).asScala.foreach {
+        filePathMinMiax =>
+          val filePath = filePathMinMiax.getFilePath
+          val minBytes = filePathMinMiax.getMin
+          val maxBytes = filePathMinMiax.getMax
+          val uniqBlockPath = if (carbonTable.isHivePartitionTable) {
+            // While data loading to SI created on Partition table, on
+            // partition directory, '/' will be
+            // replaced with '#', to support multi level partitioning. For example, BlockId will be
+            // look like `part1=1#part2=2/xxxxxxxxx`. During query also, blockId should be
+            // replaced by '#' in place of '/', to match and prune data on SI table.
+            CarbonUtil.getBlockId(carbonTable.getAbsoluteTableIdentifier,
+              filePath,
+              "",
+              true,
+              false,
+              true)
+          } else {
+            filePath.substring(filePath.lastIndexOf("/Part") + 1)
+          }
+          if (isDimension) {
+            if (isPrimitiveAndNotDate) {
+              val minValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minBytes,
+                joinDataType)
+              val maxValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxBytes,
+                joinDataType)
+              // check here if present in map, if it is, compare and update min and amx
+              if (fileMinMaxMap.contains(uniqBlockPath)) {
+                val isMinLessThanMin =
+                  comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0
+                val isMaxMoreThanMax =
+                  comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0
+                updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                  minValue,
+                  maxValue,
+                  uniqBlockPath,
+                  isMinLessThanMin,
+                  isMaxMoreThanMax)
+              } else {
+                fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+              }
+            } else {
+              if (fileMinMaxMap.contains(uniqBlockPath)) {
+                val isMinLessThanMin = ByteUtil.UnsafeComparer.INSTANCE
+                                         .compareTo(fileMinMaxMap(uniqBlockPath)._1
+                                           .asInstanceOf[String].getBytes(), minBytes) > 0
+                val isMaxMoreThanMax = ByteUtil.UnsafeComparer.INSTANCE
+                                         .compareTo(maxBytes, fileMinMaxMap(uniqBlockPath)._2
+                                           .asInstanceOf[String].getBytes()) > 0
+                updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                  new String(minBytes),
+                  new String(maxBytes),
+                  uniqBlockPath,
+                  isMinLessThanMin,
+                  isMaxMoreThanMax)
+              } else {
+                fileMinMaxMap += (uniqBlockPath -> (new String(minBytes), new String(maxBytes)))
+              }
+            }
+          } else {
+            val maxValue = DataTypeUtil.getMeasureObjectFromDataType(maxBytes, joinDataType)
+            val minValue = DataTypeUtil.getMeasureObjectFromDataType(minBytes, joinDataType)
+            if (fileMinMaxMap.contains(uniqBlockPath)) {
+              val isMinLessThanMin =
+                comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue) > 0
+              val isMaxMoreThanMin =
+                comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2) > 0
+              updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+                minValue,
+                maxValue,
+                uniqBlockPath,
+                isMinLessThanMin,
+                isMaxMoreThanMin)
+            } else {
+              fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+            }
+          }
+      }
+      fileMinMaxMapListOfAllJoinColumns += ((fileMinMaxMap, joinColumn))
+    }
+  }
+
+  /**
+   * This method updates the min max map of the block if the value is less than min or more
+   * than max
+   */
+  private def updateMapIfRequiredBasedOnMinMax(fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)],

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.mutate.FilePathMinMaxVO
+import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.comparator.SerializableComparator
+
+/**
+ * The utility class for Merge operations
+ */
+object CarbonMergeDataSetUtil {
+
+  /**
+   * This method reads the splits and make (blockPath, (min, max)) tuple to to min max pruning of
+   * the src dataset
+   *
+   * @param colTosplitsFilePathAndMinMaxMap   CarbonInputSplit whose min max cached in driver or
+   *                                          the index server
+   * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath and min max of all the
+   *                                          join columns involved
+   */
+  def addFilePathAndMinMaxTuples(
+      colTosplitsFilePathAndMinMaxMap: mutable.Map[String, util.List[FilePathMinMaxVO]],
+      carbonTable: CarbonTable,
+      joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn, SerializableComparator],

Review comment:
       updated

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
##########
@@ -81,6 +82,7 @@ class CarbonEnv {
 
     sparkSession.udf.register("getTupleId", () => "")
     sparkSession.udf.register("getPositionId", () => "")
+    sparkSession.udf.register("block_paths", new BlockPathsUDF)

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
##########
@@ -89,7 +89,7 @@ class DistributedIndexJob extends AbstractIndexJob {
         }
         client.getSplits(indexFormat)
           .getExtendedBlocklets(indexFormat.getCarbonTable.getTablePath, indexFormat
-            .getQueryId, indexFormat.isCountStarJob)
+            .getQueryId, indexFormat.isCountStarJob, null)

Review comment:
       refactored

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
##########
@@ -427,6 +427,6 @@ object DistributedRDDUtils {
       new java.util.ArrayList(),
       new java.util.ArrayList())
     new ExtendedBlockletWrapper(blocklets, request.getCarbonTable.getTablePath, request.getQueryId,
-      request.isWriteToFile, request.isCountStarJob)
+      request.isWriteToFile, request.isCountStarJob, request.getCdcVO)

Review comment:
       refactored

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object DataUPSERTExample {
+
+  def main(args: Array[String]): Unit = {
+    val spark = ExampleUtils.createSparkSession("DataUPSERTExample")
+    performUPSERT(spark)
+  }
+
+  def performUPSERT(spark: SparkSession): Unit = {
+    spark.sql("drop table if exists target")
+    val initframe = spark.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = spark.read.format("carbondata").option("tableName", "target").load()
+    var cdc =
+      spark.createDataFrame(Seq(
+        Row("a", "7"),
+        Row("b", null),
+        Row("g", null),
+        Row("e", "3")
+      ).asJava,

Review comment:
       the API takes the java list

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object DataUPSERTExample {

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/range/BlockMinMaxTree.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.range;
+
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET;
+
+/**
+ * This class prepares a tree for pruning using min-max of block
+ */
+public class BlockMinMaxTree implements Serializable {
+
+  private MinMaxNode root;
+
+  private final boolean isPrimitiveAndNotDate;
+  private final boolean isDimensionColumn;
+  private final DataType joinDataType;
+  private final SerializableComparator comparator;
+
+  public BlockMinMaxTree(boolean isPrimitiveAndNotDate, boolean isDimensionColumn,
+      DataType joinDataType, SerializableComparator comparator) {
+    this.isPrimitiveAndNotDate = isPrimitiveAndNotDate;
+    this.isDimensionColumn = isDimensionColumn;
+    this.joinDataType = joinDataType;
+    this.comparator = comparator;
+  }
+
+  public MinMaxNode getRoot() {
+    return root;
+  }
+
+  public void insert(MinMaxNode newMinMaxNode) {
+    root = insert(getRoot(), newMinMaxNode);
+  }
+
+  MinMaxNode insert(MinMaxNode root, MinMaxNode newMinMaxNode) {

Review comment:
       changed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670183646



##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
##########
@@ -780,6 +780,68 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       Seq(Row("c", "200"), Row("e", "100")))
   }
 
+  test("test new API") {

Review comment:
       change name to include more testcase description
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678168587



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT
+ */
+abstract class MergeHandler(
+    sparkSession: SparkSession,
+    frame: DataFrame,
+    targetCarbonTable: CarbonTable,
+    stats: Stats,
+    srcDS: DataFrame) {
+
+  protected def performTagging: (RDD[Row], String) = {
+    val tupleId = frame.queryExecution.analyzed.output.zipWithIndex
+      .find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2
+    val schema =
+      org.apache.spark.sql.types.StructType(Seq(
+        StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, StringType)))
+    val job = CarbonSparkUtil.createHadoopJob()
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    val insertedRows = stats.insertedRows
+    val updatedRows = stats.updatedRows
+    val uuid = UUID.randomUUID.toString
+    job.setJobID(new JobID(uuid, 0))
+    val path = targetCarbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "avro"
+    FileOutputFormat.setOutputPath(job, new Path(path))
+    val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job, schema)
+    val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration)
+    frame.queryExecution.toRdd.mapPartitionsWithIndex { case (index, iterator) =>
+      val confB = config.value.value
+      val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
+      val attemptID = new TaskAttemptID(task, index)
+      val context = new TaskAttemptContextImpl(confB, attemptID)
+      val writer = factory.newInstance(path + CarbonCommonConstants.FILE_SEPARATOR + task.toString,
+        schema, context)
+      new Iterator[InternalRow] {
+        override def hasNext: Boolean = {
+          if (iterator.hasNext) {
+            true
+          } else {
+            writer.close()
+            false
+          }
+        }
+
+        override def next(): InternalRow = {
+          val row = iterator.next()
+          val newArray = new Array[Any](1)
+          val tupleID = row.getUTF8String(tupleId)
+          if (tupleID == null) {
+            insertedRows.add(1)
+          } else {
+            newArray(0) = tupleID
+            writer.write(new GenericInternalRow(newArray))
+            updatedRows.add(1)
+          }
+          null
+        }
+      }
+    }.count()
+    val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, path)
+    (deltaRdd, path)
+  }
+
+  protected def triggerAction(
+      factTimestamp: Long,
+      executorErrors: ExecutionErrors,
+      deltaRdd: RDD[Row],
+      deltaPath: String): (util.List[SegmentUpdateDetails], Seq[Segment]) = {
+    val tuple = MergeUtil.triggerAction(sparkSession,
+      targetCarbonTable,
+      factTimestamp,
+      executorErrors,
+      deltaRdd)
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
+    MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable, factTimestamp, tuple)
+    tuple
+  }
+
+  protected def insertDataToTargetTable(updateTableModel: Option[UpdateTableModel]): Seq[Row] = {
+    val tableCols =
+      targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).
+        filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+    val header = tableCols.mkString(",")
+    val dataFrame = srcDS.select(tableCols.map(col): _*)
+    MergeUtil.insertDataToTargetTable(sparkSession,
+      targetCarbonTable,
+      header,
+      updateTableModel,
+      dataFrame)
+  }
+
+  protected def tryHorizontalCompaction(): Unit = {
+    // Do IUD Compaction.
+    HorizontalCompaction.tryHorizontalCompaction(
+      sparkSession, targetCarbonTable)
+  }
+
+  def handleMerge()
+}
+
+case class UpdateHandler(
+    sparkSession: SparkSession,
+    frame: DataFrame,
+    targetCarbonTable: CarbonTable,
+    stats: Stats,
+    srcDS: DataFrame) extends MergeHandler(sparkSession, frame, targetCarbonTable, stats, srcDS) {
+
+  override def handleMerge(): Unit = {
+    assert(frame != null)

Review comment:
       added

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
##########
@@ -286,6 +287,11 @@ object CarbonFilters {
         val (columnName, instance) = getGeoHashHandler(relation.carbonTable)
         Some(new PolygonRangeListExpression(children.head.toString(), children.last.toString(),
           columnName, instance))
+      case _: BlockPathsUDF =>
+        if (children.size > 1) {
+          throw new MalformedCarbonCommandException("Expect one string in polygon")

Review comment:
       yeah, updated now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-889031171


   LGTM
   
   we can analyze and handle CARBONDATA-4252 later. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-887764067


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/4032/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678168479



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -198,54 +420,38 @@ case class CarbonMergeDataSetCommand(
       val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, deltaPath)
       val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
       FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
-      if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1.asScala.asJava,
-        carbonTable,
-        trxMgr.getLatestTrx.toString, false, false)) {
-        LOGGER.error("writing of update status file failed")
-        throw new CarbonMergeDataSetException("writing of update status file failed")
-      }
+      MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable,
+        trxMgr.getLatestTrx, tuple)
       Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx,
         executorErrors, tuple._2, Option.empty))
     } else {
       None
     }
 
     val dataFrame = loadDF.select(tableCols.map(col): _*)
-    CarbonInsertIntoCommand(databaseNameOp = Some(carbonTable.getDatabaseName),
-      tableName = carbonTable.getTableName,
-      options = Map("fileheader" -> header),
-      isOverwriteTable = false,
-      dataFrame.queryExecution.logical,
-      carbonTable.getTableInfo,
-      Map.empty,
-      Map.empty,
-      new OperationContext,
-      updateTableModel
-    ).run(sparkSession)
+    MergeUtil.insertDataToTargetTable(sparkSession,
+      targetCarbonTable,
+      header,
+      updateTableModel,
+      dataFrame)
 
     if (hasDelAction && count == 0) {
-      val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
-        .getTableStatusFilePath(carbonTable.getTablePath))
-      CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail =>
-        new Segment(loadMetadataDetail.getMergedLoadName,
-          loadMetadataDetail.getSegmentFile)).toSet.asJava,
-        carbonTable,
-        trxMgr.getLatestTrx.toString,
-        true,
-        true, new util.ArrayList[Segment]())
+      MergeUtil.updateStatusIfJustDeleteOperation(targetCarbonTable, trxMgr.getLatestTrx)
     }
     LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
     LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
     LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
     LOGGER.info(
       " Time taken to merge data  :: " + (System.currentTimeMillis() - st))
 
-  // Load the history table if the insert history table action is added by user.
-    HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head, carbonTable,
+    // Load the history table if the insert history table action is added by user.

Review comment:
       it was wrong before, corrected now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670690463



##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
##########
@@ -168,7 +175,7 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
    * @throws IOException
    */
   public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob,
-      boolean isExternalPath)
+      boolean isExternalPath, CdcVO cdcVO)

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
##########
@@ -886,7 +886,7 @@ public String getTableTaskInfo(int index) {
     }
   }
 
-  private byte[][] getMinMaxValue(IndexRow row, int index) {
+  public static byte[][] getMinMaxValue(IndexRow row, int index) {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
kunal642 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678021015



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT
+ */
+abstract class MergeHandler(
+    sparkSession: SparkSession,
+    frame: DataFrame,
+    targetCarbonTable: CarbonTable,
+    stats: Stats,
+    srcDS: DataFrame) {
+
+  protected def performTagging: (RDD[Row], String) = {
+    val tupleId = frame.queryExecution.analyzed.output.zipWithIndex
+      .find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2
+    val schema =
+      org.apache.spark.sql.types.StructType(Seq(
+        StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, StringType)))
+    val job = CarbonSparkUtil.createHadoopJob()
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    val insertedRows = stats.insertedRows
+    val updatedRows = stats.updatedRows
+    val uuid = UUID.randomUUID.toString
+    job.setJobID(new JobID(uuid, 0))
+    val path = targetCarbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "avro"
+    FileOutputFormat.setOutputPath(job, new Path(path))
+    val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job, schema)
+    val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration)
+    frame.queryExecution.toRdd.mapPartitionsWithIndex { case (index, iterator) =>
+      val confB = config.value.value
+      val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
+      val attemptID = new TaskAttemptID(task, index)
+      val context = new TaskAttemptContextImpl(confB, attemptID)
+      val writer = factory.newInstance(path + CarbonCommonConstants.FILE_SEPARATOR + task.toString,
+        schema, context)
+      new Iterator[InternalRow] {
+        override def hasNext: Boolean = {
+          if (iterator.hasNext) {
+            true
+          } else {
+            writer.close()
+            false
+          }
+        }
+
+        override def next(): InternalRow = {
+          val row = iterator.next()
+          val newArray = new Array[Any](1)
+          val tupleID = row.getUTF8String(tupleId)
+          if (tupleID == null) {
+            insertedRows.add(1)
+          } else {
+            newArray(0) = tupleID
+            writer.write(new GenericInternalRow(newArray))
+            updatedRows.add(1)
+          }
+          null
+        }
+      }
+    }.count()
+    val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, path)
+    (deltaRdd, path)
+  }
+
+  protected def triggerAction(
+      factTimestamp: Long,
+      executorErrors: ExecutionErrors,
+      deltaRdd: RDD[Row],
+      deltaPath: String): (util.List[SegmentUpdateDetails], Seq[Segment]) = {
+    val tuple = MergeUtil.triggerAction(sparkSession,
+      targetCarbonTable,
+      factTimestamp,
+      executorErrors,
+      deltaRdd)
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
+    MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable, factTimestamp, tuple)
+    tuple
+  }
+
+  protected def insertDataToTargetTable(updateTableModel: Option[UpdateTableModel]): Seq[Row] = {
+    val tableCols =
+      targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).
+        filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+    val header = tableCols.mkString(",")
+    val dataFrame = srcDS.select(tableCols.map(col): _*)
+    MergeUtil.insertDataToTargetTable(sparkSession,
+      targetCarbonTable,
+      header,
+      updateTableModel,
+      dataFrame)
+  }
+
+  protected def tryHorizontalCompaction(): Unit = {
+    // Do IUD Compaction.
+    HorizontalCompaction.tryHorizontalCompaction(
+      sparkSession, targetCarbonTable)
+  }
+
+  def handleMerge()
+}
+
+case class UpdateHandler(
+    sparkSession: SparkSession,
+    frame: DataFrame,
+    targetCarbonTable: CarbonTable,
+    stats: Stats,
+    srcDS: DataFrame) extends MergeHandler(sparkSession, frame, targetCarbonTable, stats, srcDS) {
+
+  override def handleMerge(): Unit = {
+    assert(frame != null)

Review comment:
       add failure message




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [WIP]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-857488610


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/3785/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670178012



##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object DataUPSERTExample {
+
+  def main(args: Array[String]): Unit = {
+    val spark = ExampleUtils.createSparkSession("DataUPSERTExample")
+    performUPSERT(spark)
+  }
+
+  def performUPSERT(spark: SparkSession): Unit = {
+    spark.sql("drop table if exists target")
+    val initframe = spark.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = spark.read.format("carbondata").option("tableName", "target").load()
+    var cdc =
+      spark.createDataFrame(Seq(
+        Row("a", "7"),
+        Row("b", null),
+        Row("g", null),
+        Row("e", "3")
+      ).asJava,

Review comment:
       why asJava?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677106620



##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object UPSERTExample {
+
+  def main(args: Array[String]): Unit = {
+    val spark = ExampleUtils.createSparkSession("DataUPSERTExample")
+    performUPSERT(spark)
+  }
+
+  def performUPSERT(spark: SparkSession): Unit = {
+    spark.sql("drop table if exists target")
+    val initframe = spark.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = spark.read.format("carbondata").option("tableName", "target").load()
+    var cdc =
+      spark.createDataFrame(Seq(
+        Row("a", "7"),
+        Row("b", null),
+        Row("g", null),
+        Row("e", "3")
+      ).asJava,
+        StructType(Seq(StructField("key", StringType),
+          StructField("value", StringType))))
+    spark.sql("select * from target").show(false)
+    // upsert API updates a and b, inserts e and g
+    target.as("A").merge(cdc.as("B"), "key", "upsert").execute()

Review comment:
       better to remove old API ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-889188385


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12602/job/ApacheCarbonPRBuilder2.3/5797/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
kunal642 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r677990392



##########
File path: core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
##########
@@ -102,6 +103,10 @@
 
   private Set<String> missingSISegments;
 
+  private CdcVO cdcVO;
+
+  private Boolean isCDCJob;

Review comment:
       no need of this boolean, can simply check if CdcVO is null or not and write/read based on that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-889031375


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r676835482



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -79,113 +87,514 @@ case class CarbonMergeDataSetCommand(
    */
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val relations = CarbonSparkUtil.collectCarbonRelation(targetDsOri.logicalPlan)
-    // Target dataset must be backed by carbondata table.
+    val st = System.currentTimeMillis()
+    val targetDsAliasName = targetDsOri.logicalPlan match {
+      case alias: SubqueryAlias =>
+        alias.alias
+      case _ => null
+    }
+    val sourceAliasName = srcDS.logicalPlan match {
+      case alias: SubqueryAlias =>
+        alias.alias
+      case _ => null
+    }
     if (relations.length != 1) {
       throw new UnsupportedOperationException(
         "Carbon table supposed to be present in merge dataset")
     }
-    // validate the merge matches and actions.
-    validateMergeActions(mergeMatches, targetDsOri, sparkSession)
-    val carbonTable = relations.head.carbonRelation.carbonTable
-    val hasDelAction = mergeMatches.matchList
-      .exists(_.getActions.exists(_.isInstanceOf[DeleteAction]))
-    val hasUpdateAction = mergeMatches.matchList
-      .exists(_.getActions.exists(_.isInstanceOf[UpdateAction]))
-    val (insertHistOfUpdate, insertHistOfDelete) = getInsertHistoryStatus(mergeMatches)
-    // Get all the required columns of targetDS by going through all match conditions and actions.
-    val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession)
+    // Target dataset must be backed by carbondata table.
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
     // select only the required columns, it can avoid lot of and shuffling.
-    val targetDs = targetDsOri.select(columns: _*)
-    // Update the update mapping with unfilled columns.From here on system assumes all mappings
-    // are existed.
-    mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs)
-    // Lets generate all conditions combinations as one column and add them as 'status'.
-    val condition = generateStatusColumnWithAllCombinations(mergeMatches)
-
-    // decide join type based on match conditions
-    val joinType = decideJoinType
+    val targetDs = if (mergeMatches == null && operationType != null) {
+      targetDsOri.select(keyColumn)
+    } else {
+      // Get all the required columns of targetDS by going through all match conditions and actions.
+      val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession)
+      targetDsOri.select(columns: _*)
+    }
+    // decide join type based on match conditions or based on merge operation type
+    val joinType = if (mergeMatches == null && operationType != null) {
+      MergeOperationType.withName(operationType.toUpperCase) match {
+        case MergeOperationType.UPDATE | MergeOperationType.DELETE =>
+          "inner"
+        case MergeOperationType.UPSERT =>
+          "right_outer"
+        case MergeOperationType.INSERT =>
+          null
+      }
+    } else {
+      decideJoinType
+    }
 
-    val joinColumns = mergeMatches.joinExpr.expr.collect {
-      case unresolvedAttribute: UnresolvedAttribute if unresolvedAttribute.nameParts.nonEmpty =>
-        // Let's say the join condition will be something like A.id = B.id, then it will be an
-        // EqualTo expression, with left expression as UnresolvedAttribute(A.id) and right will
-        // be a Literal(B.id). Since we need the column name here, we can directly check the left
-        // which is UnresolvedAttribute. We take nameparts from UnresolvedAttribute which is an
-        // ArrayBuffer containing "A" and "id", since "id" is column name, we take
-        // nameparts.tail.head which gives us "id" column name.
-        unresolvedAttribute.nameParts.tail.head
-    }.distinct
+    val joinColumns = if (mergeMatches == null) {
+      Seq(keyColumn)
+    } else {
+      mergeMatches.joinExpr.expr.collect {
+        case unresolvedAttribute: UnresolvedAttribute if unresolvedAttribute.nameParts.nonEmpty =>
+          // Let's say the join condition will be something like A.id = B.id, then it will be an
+          // EqualTo expression, with left expression as UnresolvedAttribute(A.id) and right will
+          // be a Literal(B.id). Since we need the column name here, we can directly check the left
+          // which is UnresolvedAttribute. We take nameparts from UnresolvedAttribute which is an
+          // ArrayBuffer containing "A" and "id", since "id" is column name, we take
+          // nameparts.tail.head which gives us "id" column name.
+          unresolvedAttribute.nameParts.tail.head
+      }.distinct
+    }
 
     // repartition the srsDs, if the target has bucketing and the bucketing columns contains join
     // columns
     val repartitionedSrcDs =
-      if (carbonTable.getBucketingInfo != null &&
-          carbonTable.getBucketingInfo
+      if (targetCarbonTable.getBucketingInfo != null &&
+          targetCarbonTable.getBucketingInfo
             .getListOfColumns
             .asScala
             .map(_.getColumnName).containsSlice(joinColumns)) {
-        srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges,
+        srcDS.repartition(targetCarbonTable.getBucketingInfo.getNumOfRanges,
           joinColumns.map(srcDS.col): _*)
       } else {
       srcDS
+      }
+
+    // cache the source data as we will be scanning multiple times
+    repartitionedSrcDs.cache()

Review comment:
       yeah, refactored and moved to `CarbonMergeDataSetUtil` class which I have added newly, please check

##########
File path: examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object UPSERTExample {
+
+  def main(args: Array[String]): Unit = {
+    val spark = ExampleUtils.createSparkSession("DataUPSERTExample")
+    performUPSERT(spark)
+  }
+
+  def performUPSERT(spark: SparkSession): Unit = {
+    spark.sql("drop table if exists target")
+    val initframe = spark.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = spark.read.format("carbondata").option("tableName", "target").load()
+    var cdc =
+      spark.createDataFrame(Seq(
+        Row("a", "7"),
+        Row("b", null),
+        Row("g", null),
+        Row("e", "3")
+      ).asJava,
+        StructType(Seq(StructField("key", StringType),
+          StructField("value", StringType))))
+    spark.sql("select * from target").show(false)
+    // upsert API updates a and b, inserts e and g
+    target.as("A").merge(cdc.as("B"), "key", "upsert").execute()

Review comment:
       MergeSQL is not yet supported for the new APIs added in this PR, I will create a jira and handle it separately as the effort estimation is not done for that.
   https://issues.apache.org/jira/browse/CARBONDATA-4252

##########
File path: core/src/main/java/org/apache/carbondata/core/scan/filter/executer/CDCBlockImplicitExecutorImpl.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.Set;
+
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * This filter executor class will be called when the CDC pruning is enabled.
+ */
+public class CDCBlockImplicitExecutorImpl implements FilterExecutor, ImplicitColumnFilterExecutor {
+
+  private final Set<String> blocksToScan;
+
+  public CDCBlockImplicitExecutorImpl(Set<String> blocksToScan) {
+    this.blocksToScan = blocksToScan;
+  }
+
+  @Override
+  public BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] minValue,
+      String uniqueBlockPath, boolean[] isMinMaxSet) {
+    boolean isScanRequired = false;

Review comment:
       yeah, you are right, changed

##########
File path: core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/CDCBlockImplicitExpression.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.expression.conditional;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+
+/**
+ * This expression will be added to Index filter when CDC pruning is enabled.
+ */
+public class CDCBlockImplicitExpression extends Expression {
+
+  Set<String> blocksToScan;
+
+  public CDCBlockImplicitExpression(String blockPathValues) {
+    blocksToScan =
+        Arrays.stream(blockPathValues.split(",")).map(String::trim).collect(Collectors.toSet());
+  }
+
+  @Override
+  public ExpressionResult evaluate(RowIntf value) {
+    throw new UnsupportedOperationException("Not allowed on Implicit expression");
+  }
+
+  @Override
+  public ExpressionType getFilterExpressionType() {
+    return ExpressionType.IMPLICIT;
+  }
+
+  @Override
+  public void findAndSetChild(Expression oldExpr, Expression newExpr) {
+

Review comment:
       corrected

##########
File path: core/src/main/java/org/apache/carbondata/core/range/MinMaxNode.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.range;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Each node to be inserted in BlockMinMaxTree for pruning.
+ */
+public class MinMaxNode implements Serializable {
+
+  // list of files present in same range of min max of this node
+  private List<String> filePaths = new ArrayList<>();
+
+  private Object min;
+
+  private Object max;
+
+  private MinMaxNode leftSubTree;

Review comment:
       this is not exactly the min max of node, its complete min and complete max of the parent node's left and right sub tree

##########
File path: core/src/main/java/org/apache/carbondata/core/range/BlockMinMaxTree.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.range;
+
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET;
+
+/**
+ * This class prepares a tree for pruning using min-max of block
+ */
+public class BlockMinMaxTree implements Serializable {
+
+  private MinMaxNode root;
+
+  private final boolean isPrimitiveAndNotDate;
+  private final boolean isDimensionColumn;
+  private final DataType joinDataType;
+  private final SerializableComparator comparator;
+
+  public BlockMinMaxTree(boolean isPrimitiveAndNotDate, boolean isDimensionColumn,
+      DataType joinDataType, SerializableComparator comparator) {
+    this.isPrimitiveAndNotDate = isPrimitiveAndNotDate;
+    this.isDimensionColumn = isDimensionColumn;
+    this.joinDataType = joinDataType;
+    this.comparator = comparator;
+  }
+
+  public MinMaxNode getRoot() {
+    return root;
+  }
+
+  public void insert(MinMaxNode newMinMaxNode) {
+    root = insert(getRoot(), newMinMaxNode);
+  }
+
+  private MinMaxNode insert(MinMaxNode root, MinMaxNode newMinMaxNode) {
+    /* 1. check if the root null, then insert and make new node
+     * 2. check if the new node completely overlaps with the root, where minCompare and maxCompare
+     * both are zero, if yes add the filepaths and return
+     * 3. if root is less than new node, check if the root has right subtree,
+     *    if(yes) {
+     *       replace the right node with the newnode's min and max based on comparison and then
+     *        call insert with right node as new root and newnode
+     *         insert(root.getRight, newnode)
+     *     } else {
+     *       make the new node as right node and set right node and return
+     *     }
+     * 4. if root is more than new node, check if the root has left subtree,
+     *    if(yes) {
+     *       replace the left node with the newnode's min and max based on comparison and then
+     *        call insert with left node as new root and newnode
+     *         insert(root.getLeft, newnode)
+     *     } else {
+     *       make the new node as left node and set left node and return
+     *     }
+     * */
+    if (root == null) {
+      root = newMinMaxNode;
+      return root;
+    }
+
+    if (compareNodesBasedOnMinMax(root, newMinMaxNode) == 0) {
+      root.addFilePats(newMinMaxNode.getFilePaths());
+      return root;
+    }
+
+    if (compareNodesBasedOnMinMax(root, newMinMaxNode) < 0) {
+      if (root.getRightSubTree() == null) {
+        root.setRightSubTree(newMinMaxNode);
+        root.setRightSubTreeMax(newMinMaxNode.getMax());
+        root.setRightSubTreeMin(newMinMaxNode.getMin());
+      } else {
+        if (compareMinMax(root.getRightSubTreeMax(), newMinMaxNode.getMax()) < 0) {
+          root.setRightSubTreeMax(newMinMaxNode.getMax());
+        }
+        if (compareMinMax(root.getRightSubTreeMin(), newMinMaxNode.getMin()) > 0) {
+          root.setRightSubTreeMin(newMinMaxNode.getMin());
+        }
+        insert(root.getRightSubTree(), newMinMaxNode);
+      }
+    } else {
+      if (root.getLeftSubTree() == null) {
+        root.setLeftSubTree(newMinMaxNode);
+        root.setLeftSubTreeMax(newMinMaxNode.getMax());
+        root.setLeftSubTreeMin(newMinMaxNode.getMin());
+      } else {
+        if (compareMinMax(root.getLeftSubTreeMax(), newMinMaxNode.getMax()) < 0) {
+          root.setLeftSubTreeMax(newMinMaxNode.getMax());
+        }
+        if (compareMinMax(root.getLeftSubTreeMin(), newMinMaxNode.getMin()) > 0) {
+          root.setLeftSubTreeMin(newMinMaxNode.getMin());
+        }
+        insert(root.getLeftSubTree(), newMinMaxNode);
+      }
+    }
+    return root;
+  }
+
+  private int compareNodesBasedOnMinMax(MinMaxNode root, MinMaxNode newMinMaxNode) {
+    int minCompare = compareMinMax(root.getMin(), newMinMaxNode.getMin());
+    int maxCompare = compareMinMax(root.getMax(), newMinMaxNode.getMax());
+    if (minCompare == 0) {
+      return maxCompare;
+    } else {
+      return minCompare;
+    }
+  }
+
+  private int compareMinMax(Object key1, Object key2) {
+    if (isDimensionColumn) {
+      if (isPrimitiveAndNotDate) {
+        return comparator.compare(key1, key2);
+      } else {
+        return ByteUtil.UnsafeComparer.INSTANCE

Review comment:
       this logic actually same as what we use in query flow. So this is ok, we can combine start if and last else as u suggested, then it wont be clear to understand and later may lead to add multiple checks for string and varchar etc., so this should be fine I think.

##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CdcVO.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.mutate;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * VO object which contains the info used in CDC case during cache loading in Index server
+ */
+public class CdcVO implements Serializable, Writable {
+
+  /**
+   * This collection contains column to index mapping which give info about the index for a column
+   * in IndexRow object to fetch min max
+   */
+  private Map<String, Integer> columnToIndexMap;
+
+  private List<Integer> indexesToFetch;

Review comment:
       1. yeah, `indexesToFetch ` will be filled in deserialization, `columnToIndexMap` will be null.
   2. Actually `columnToIndexMap` will have column to index map, indexes required during fetching min max during blocklet serialization, column names require during deserialization, since only index required, serializing on that.

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
##########
@@ -163,20 +171,36 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
    * Method to serialize extended blocklet and input split for index server
    * DataFormat
    * <Extended Blocklet data><Carbon input split serializeData length><CarbonInputSplitData>
-   * @param out
-   * @param uniqueLocation
+   * @param out data output to write the primitives to extended blocklet
+   * @param uniqueLocation location to write the blocklet in case of distributed pruning, ex: Lucene
+   * @param isExternalPath identification for the externam segment
    * @throws IOException
    */
-  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob,
-      boolean isExternalPath)
+  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation,
+      IndexInputFormat indexInputFormat, boolean isExternalPath)
       throws IOException {
     super.write(out);
-    if (isCountJob) {
+    if (indexInputFormat.isCountStarJob()) {
       // In CarbonInputSplit, getDetailInfo() is a lazy call. we want to avoid this during
       // countStar query. As rowCount is filled inside getDetailInfo(). In countStar case we may
       // not have proper row count. So, always take row count from indexRow.
       out.writeLong(inputSplit.getIndexRow().getInt(BlockletIndexRowIndexes.ROW_COUNT_INDEX));
       out.writeUTF(inputSplit.getSegmentId());
+    } else if (indexInputFormat.getCdcVO() != null) {
+      // In case of CDC, we ust need the filepath and the min max of the blocklet, so just serialize

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
##########
@@ -207,19 +231,35 @@ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boo
 
   /**
    * Method to deserialize extended blocklet and input split for index server
-   * @param in
-   * @param locations
-   * @param tablePath
+   * @param in data input stream to read the primitives of extended blocklet
+   * @param locations locations of the input split
+   * @param tablePath carbon table path
    * @throws IOException
    */
   public void deserializeFields(DataInput in, String[] locations, String tablePath,
-      boolean isCountJob)
+      boolean isCountJob, CdcVO cdcVO)
       throws IOException {
     super.readFields(in);
     if (isCountJob) {
       count = in.readLong();
       segmentNo = in.readUTF();
       return;
+    } else if (cdcVO != null) {
+      filePath = in.readUTF();
+      this.columnToMinMaxMapping = new HashMap<>();
+      for (Map.Entry<String, Integer> entry : cdcVO.getColumnToIndexMap().entrySet()) {

Review comment:
       yeah right, updated

##########
File path: core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
##########
@@ -330,6 +339,11 @@ public void readFields(DataInput in) throws IOException {
         missingSISegments.add(in.readUTF());
       }
     }
+    this.isCDCJob = in.readBoolean();

Review comment:
       When `getSplits` will be called for index server, it will serialize `extendedBlockletWrapper`, so it will be written and deserialize wont be any problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-889176645


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_2.4.5/4054/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670172468



##########
File path: core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
##########
@@ -168,7 +175,7 @@ public void setColumnSchema(List<ColumnSchema> columnSchema) {
    * @throws IOException
    */
   public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob,
-      boolean isExternalPath)
+      boolean isExternalPath, CdcVO cdcVO)

Review comment:
       please add comment for all parameters in javadoc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-887555776


   Build Failed  with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/177/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] jackylk commented on a change in pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
jackylk commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670181919



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.command.{ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT
+ */
+abstract class MergeHandler(sparkSession: SparkSession,

Review comment:
       move param to next line, please follow the coding convention




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4148: [CARBONDATA-4242]Improve cdc performance and introduce new APIs for UPSERT, DELETE, INSERT and UPDATE

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#issuecomment-887014639


   Build Success with Spark 3.1, Please check CI http://121.244.95.60:12602/job/ApacheCarbon_PR_Builder_3.1/161/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org