You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/04/13 16:00:20 UTC

[carbondata] branch master updated: [CARBONDATA-3771] Fixed date filter issue and use hive constants instead of hard-coded values

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e830c5  [CARBONDATA-3771] Fixed date filter issue and use hive constants instead of hard-coded values
0e830c5 is described below

commit 0e830c583bfdbb81a700e0bb2803f61b3bd3b63a
Author: kunal642 <ku...@gmail.com>
AuthorDate: Sun Jan 12 02:30:15 2020 +0530

    [CARBONDATA-3771] Fixed date filter issue and use hive constants instead of hard-coded values
    
    Why is this PR needed?
    Filter on date column is giving wrong results due to incorrect carbon expression.
    
    What changes were proposed in this PR?
    For date column getExprString is giving the filter value as 'DATE2020-01-01' which causes the filter to fail.
    Changed to ((ExprNodeConstantDesc) exprNodeDesc).getValue().toString() for ConstantDesc for better handling
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3705
---
 .../carbondata/core/metadata/SegmentFileStore.java |  18 ++-
 .../carbondata/core/util/CarbonProperties.java     |   8 +-
 .../core/writer/CarbonIndexFileMergeWriter.java    |   2 +-
 .../hadoop/api/CarbonTableInputFormat.java         |   3 +-
 .../carbondata/hive/Hive2CarbonExpression.java     |  39 ++++---
 .../carbondata/hive/MapredCarbonInputFormat.java   |  28 ++---
 .../hive/MapredCarbonOutputCommitter.java          | 122 +++++++++++++++++++++
 .../carbondata/hive/MapredCarbonOutputFormat.java  |  31 ++++--
 .../carbondata/hive/util/HiveCarbonUtil.java       |  45 ++++----
 .../carbondata/hive/Hive2CarbonExpressionTest.java |  19 ++++
 .../processing/loading/DataLoadExecutor.java       |  16 ---
 11 files changed, 244 insertions(+), 87 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 7aa2843..b0c1442 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -65,6 +65,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
@@ -320,6 +321,21 @@ public class SegmentFileStore {
     return false;
   }
 
+  public static void mergeIndexAndWriteSegmentFile(CarbonTable carbonTable, String segmentId,
+      String UUID) {
+    String tablePath = carbonTable.getTablePath();
+    String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT;
+    try {
+      SegmentFileStore sfs = new SegmentFileStore(tablePath, segmentFileName);
+      List<CarbonFile> carbonIndexFiles = sfs.getIndexCarbonFiles();
+      new CarbonIndexFileMergeWriter(carbonTable)
+          .writeMergeIndexFileBasedOnSegmentFile(segmentId, null, sfs,
+              carbonIndexFiles.toArray(new CarbonFile[carbonIndexFiles.size()]), UUID, null);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /**
    * Write segment file to the metadata folder of the table selecting only the current load files
    *
@@ -1250,7 +1266,7 @@ public class SegmentFileStore {
      */
     private String segmentMetaDataInfo;
 
-    SegmentFile() {
+    public SegmentFile() {
       locationMap = new HashMap<>();
     }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index eccbc57..36b8f90 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -855,7 +855,13 @@ public final class CarbonProperties {
    * Return the store path
    */
   public static String getStorePath() {
-    return getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
+    String storePath = getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
+    if (storePath == null) {
+      // Internally spark sets the value of spark.warehouse.dir to hive.metastore.warehouse.dir.
+      // So no need to check for spark property.
+      storePath = FileFactory.getConfiguration().get("hive.metastore.warehouse.dir");
+    }
+    return storePath;
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index abb12ce..e4f05f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -202,7 +202,7 @@ public class CarbonIndexFileMergeWriter {
     return null;
   }
 
-  private String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
+  public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
       List<String> indexFileNamesTobeAdded, SegmentFileStore segmentFileStore,
       CarbonFile[] indexFiles, String uuid, String partitionPath) throws IOException {
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 0a22e25..193479a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -89,7 +89,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
   private static final Logger LOG =
       LogServiceFactory.getLogService(CarbonTableInputFormat.class.getName());
-  public static final String CARBON_TRANSACTIONAL_TABLE =
+  protected static final String CARBON_TRANSACTIONAL_TABLE =
       "mapreduce.input.carboninputformat.transactional";
   public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
   public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
@@ -354,7 +354,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   private List<InputSplit> getSplits(JobContext job, IndexFilter expression,
       List<Segment> validSegments, SegmentUpdateStatusManager updateStatusManager,
       List<Segment> invalidSegments) throws IOException {
-
     List<String> segmentsToBeRefreshed = new ArrayList<>();
     if (!CarbonProperties.getInstance()
         .isDistributedPruningEnabled(carbonTable.getDatabaseName(), carbonTable.getTableName())) {
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/Hive2CarbonExpression.java b/integration/hive/src/main/java/org/apache/carbondata/hive/Hive2CarbonExpression.java
index 7a0fa30..cc1052e 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/Hive2CarbonExpression.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/Hive2CarbonExpression.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 import org.apache.carbondata.hadoop.api.CarbonInputFormat;
 import org.apache.carbondata.hive.util.DataTypeUtil;
 
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -65,6 +66,14 @@ public class Hive2CarbonExpression {
   private static final Logger LOG =
       LogServiceFactory.getLogService(CarbonInputFormat.class.getName());
 
+  private static String getExpressionValue(ExprNodeDesc exprNodeDesc) {
+    if (exprNodeDesc instanceof ExprNodeConstantDesc) {
+      return ((ExprNodeConstantDesc) exprNodeDesc).getValue().toString();
+    } else {
+      return exprNodeDesc.getExprString().replace("'", "");
+    }
+  }
+
   public static Expression convertExprHive2Carbon(ExprNodeDesc exprNodeDesc) {
     if (exprNodeDesc instanceof ExprNodeGenericFuncDesc) {
       ExprNodeGenericFuncDesc exprNodeGenericFuncDesc = (ExprNodeGenericFuncDesc) exprNodeDesc;
@@ -75,7 +84,7 @@ public class Hive2CarbonExpression {
             getDateType(l1.get(left).getTypeString()));
         List<Expression> listExpr = new ArrayList<>();
         for (int i = right; i < l1.size(); i++) {
-          LiteralExpression literalExpression = new LiteralExpression(l1.get(i).getExprString(),
+          LiteralExpression literalExpression = new LiteralExpression(getExpressionValue(l1.get(i)),
               getDateType(l1.get(left).getTypeString()));
           listExpr.add(literalExpression);
         }
@@ -93,7 +102,6 @@ public class Hive2CarbonExpression {
         Expression rightExpression =
             convertExprHive2Carbon(exprNodeGenericFuncDesc.getChildren().get(right));
         return new AndExpression(leftExpression, rightExpression);
-
       } else if (udf instanceof GenericUDFOPEqual) {
         ColumnExpression columnExpression = null;
         if (l1.get(left) instanceof ExprNodeFieldDesc) {
@@ -103,38 +111,43 @@ public class Hive2CarbonExpression {
               getDateType(l1.get(left).getTypeString()));
         }
         LiteralExpression literalExpression =
-            new LiteralExpression(l1.get(right).getExprString().replace("'", ""),
+            new LiteralExpression(getExpressionValue(l1.get(right)),
                 getDateType(l1.get(right).getTypeString()));
         return new EqualToExpression(columnExpression, literalExpression);
       } else if (udf instanceof GenericUDFOPEqualOrGreaterThan) {
         ColumnExpression columnExpression = new ColumnExpression(l1.get(left).getCols().get(left),
             getDateType(l1.get(left).getTypeString()));
-        LiteralExpression literalExpression = new LiteralExpression(l1.get(right).getExprString(),
-            getDateType(l1.get(left).getTypeString()));
+        LiteralExpression literalExpression =
+            new LiteralExpression(getExpressionValue(l1.get(right)),
+                getDateType(l1.get(left).getTypeString()));
         return new GreaterThanEqualToExpression(columnExpression, literalExpression);
       } else if (udf instanceof GenericUDFOPGreaterThan) {
         ColumnExpression columnExpression = new ColumnExpression(l1.get(left).getCols().get(left),
             getDateType(l1.get(left).getTypeString()));
-        LiteralExpression literalExpression = new LiteralExpression(l1.get(right).getExprString(),
-            getDateType(l1.get(left).getTypeString()));
+        LiteralExpression literalExpression =
+            new LiteralExpression(getExpressionValue(l1.get(right)),
+                getDateType(l1.get(left).getTypeString()));
         return new GreaterThanExpression(columnExpression, literalExpression);
       } else if (udf instanceof GenericUDFOPNotEqual) {
         ColumnExpression columnExpression = new ColumnExpression(l1.get(left).getCols().get(left),
             getDateType(l1.get(left).getTypeString()));
-        LiteralExpression literalExpression = new LiteralExpression(l1.get(right).getExprString(),
-            getDateType(l1.get(left).getTypeString()));
+        LiteralExpression literalExpression =
+            new LiteralExpression(getExpressionValue(l1.get(right)),
+                getDateType(l1.get(left).getTypeString()));
         return new NotEqualsExpression(columnExpression, literalExpression);
       } else if (udf instanceof GenericUDFOPEqualOrLessThan) {
         ColumnExpression columnExpression = new ColumnExpression(l1.get(left).getCols().get(left),
             getDateType(l1.get(left).getTypeString()));
-        LiteralExpression literalExpression = new LiteralExpression(l1.get(right).getExprString(),
-            getDateType(l1.get(left).getTypeString()));
+        LiteralExpression literalExpression =
+            new LiteralExpression(getExpressionValue(l1.get(right)),
+                getDateType(l1.get(left).getTypeString()));
         return new LessThanEqualToExpression(columnExpression, literalExpression);
       } else if (udf instanceof GenericUDFOPLessThan) {
         ColumnExpression columnExpression = new ColumnExpression(l1.get(left).getCols().get(left),
             getDateType(l1.get(left).getTypeString()));
-        LiteralExpression literalExpression = new LiteralExpression(l1.get(right).getExprString(),
-            getDateType(l1.get(left).getTypeString()));
+        LiteralExpression literalExpression =
+            new LiteralExpression(getExpressionValue(l1.get(right)),
+                getDateType(l1.get(left).getTypeString()));
         return new LessThanExpression(columnExpression, literalExpression);
       } else if (udf instanceof GenericUDFOPNull) {
         ColumnExpression columnExpression = new ColumnExpression(l1.get(left).getCols().get(left),
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 2c1c24d..be2ec10 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -58,7 +59,6 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
 
 public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritable>
@@ -73,29 +73,15 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
    */
   private static void populateCarbonTable(Configuration configuration, String paths)
       throws IOException, InvalidConfigurationException {
-    String dirs = configuration.get(INPUT_DIR, "");
-    String[] inputPaths = StringUtils.split(dirs);
-    String validInputPath = null;
-    if (inputPaths.length == 0) {
-      throw new InvalidPathException("No input paths specified in job");
-    } else {
-      if (paths != null) {
-        for (String inputPath : inputPaths) {
-          inputPath = inputPath.replace("file:", "");
-          if (FileFactory.isFileExist(inputPath)) {
-            validInputPath = inputPath;
-            break;
-          }
-        }
-      }
-    }
     if (null != paths) {
       // read the schema file to get the absoluteTableIdentifier having the correct table id
       // persisted in the schema
       CarbonTable carbonTable;
       AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier
-          .from(validInputPath, getDatabaseName(configuration), getTableName(configuration));
-      String schemaPath = CarbonTablePath.getSchemaFilePath(validInputPath);
+          .from(configuration.get(hive_metastoreConstants.META_TABLE_LOCATION),
+              getDatabaseName(configuration), getTableName(configuration));
+      String schemaPath =
+          CarbonTablePath.getSchemaFilePath(absoluteTableIdentifier.getTablePath(), configuration);
       if (FileFactory.getCarbonFile(schemaPath).exists()) {
         // read the schema file to get the absoluteTableIdentifier having the correct table id
         // persisted in the schema
@@ -129,7 +115,7 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
     CarbonTable carbonTable;
     try {
       carbonTable = getCarbonTable(jobContext.getConfiguration(),
-          jobContext.getConfiguration().get("location"));
+          jobContext.getConfiguration().get(hive_metastoreConstants.META_TABLE_LOCATION));
     } catch (Exception e) {
       throw new IOException("Unable read Carbon Schema: ", e);
     }
@@ -151,7 +137,7 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
     CarbonInputFormat<Void> carbonInputFormat;
     if (carbonTable.isTransactionalTable()) {
       carbonInputFormat = new CarbonTableInputFormat<>();
-      jobContext.getConfiguration().set(CarbonTableInputFormat.CARBON_TRANSACTIONAL_TABLE, "true");
+      jobContext.getConfiguration().set(CARBON_TRANSACTIONAL_TABLE, "true");
     } else {
       carbonInputFormat = new CarbonFileInputFormat<>();
     }
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
new file mode 100644
index 0000000..500764b
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hive;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.api.CarbonOutputCommitter;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.util.HiveCarbonUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+
+public class MapredCarbonOutputCommitter extends OutputCommitter {
+
+  private CarbonOutputCommitter carbonOutputCommitter;
+
+  private final Logger LOGGER =
+      LogServiceFactory.getLogService(this.getClass().getName());
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
+    String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
+    Random random = new Random();
+    JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
+    TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
+    TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
+    org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context =
+        new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID);
+    CarbonLoadModel carbonLoadModel =
+        HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
+    CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), carbonLoadModel);
+    carbonOutputCommitter =
+        new CarbonOutputCommitter(new Path(carbonLoadModel.getTablePath()), context);
+    carbonOutputCommitter.setupJob(jobContext);
+    String loadModelStr = jobContext.getConfiguration().get("mapreduce.carbontable.load.model");
+    jobContext.getJobConf().set(JobConf.MAPRED_MAP_TASK_ENV,
+        a + ",carbon=" + loadModelStr);
+    jobContext.getJobConf().set(JobConf.MAPRED_REDUCE_TASK_ENV,
+        a + ",carbon=" + loadModelStr);
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, int status) throws IOException {
+    if (carbonOutputCommitter != null) {
+      carbonOutputCommitter.abortJob(jobContext, JobStatus.State.FAILED);
+      throw new RuntimeException("Failed to commit Job");
+    }
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    try {
+      Configuration configuration = jobContext.getConfiguration();
+      CarbonLoadModel carbonLoadModel = MapredCarbonOutputFormat.getLoadModel(configuration);
+      ThreadLocalSessionInfo.unsetAll();
+      SegmentFileStore.writeSegmentFile(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
+          carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
+      SegmentFileStore
+          .mergeIndexAndWriteSegmentFile(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
+              carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
+      CarbonTableOutputFormat.setLoadModel(configuration, carbonLoadModel);
+      carbonOutputCommitter.commitJob(jobContext);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw e;
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
index 3d901ec..3a8c7a7 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
@@ -22,8 +22,10 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
@@ -47,6 +49,11 @@ import org.apache.hadoop.util.Progressable;
 public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat
     implements HiveOutputFormat<Void, T>, OutputFormat<Void, T> {
 
+  static {
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, "hive");
+  }
+
   @Override
   public RecordWriter<Void, T> getRecordWriter(FileSystem fileSystem, JobConf jobConf, String s,
       Progressable progressable) throws IOException {
@@ -61,24 +68,32 @@ public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat
   public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
       Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties,
       Progressable progress) throws IOException {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(jc);
     CarbonLoadModel carbonLoadModel = null;
+    // Try to get loadmodel from JobConf.
     String encodedString = jc.get(LOAD_MODEL);
     if (encodedString != null) {
       carbonLoadModel =
           (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString);
-    }
-    if (carbonLoadModel == null) {
-      carbonLoadModel = HiveCarbonUtil.getCarbonLoadModel(tableProperties, jc);
     } else {
-      for (Map.Entry<Object, Object> entry : tableProperties.entrySet()) {
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getTableInfo().getFactTable()
-            .getTableProperties().put(entry.getKey().toString().toLowerCase(),
-            entry.getValue().toString().toLowerCase());
+      // Try to get loadmodel from Container environment.
+      encodedString = System.getenv("carbon");
+      if (encodedString != null) {
+        carbonLoadModel =
+            (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString);
+      } else {
+        carbonLoadModel = HiveCarbonUtil.getCarbonLoadModel(tableProperties, jc);
       }
     }
+    for (Map.Entry<Object, Object> entry : tableProperties.entrySet()) {
+      carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getTableInfo().getFactTable()
+          .getTableProperties()
+          .put(entry.getKey().toString().toLowerCase(), entry.getValue().toString().toLowerCase());
+    }
     String tablePath = FileFactory.getCarbonFile(carbonLoadModel.getTablePath()).getAbsolutePath();
     TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get("mapred.task.id"));
     TaskAttemptContextImpl context = new TaskAttemptContextImpl(jc, taskAttemptID);
+    carbonLoadModel.setTaskNo("" + taskAttemptID.getTaskID().getId());
     final boolean isHivePartitionedTable =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().isHivePartitionTable();
     PartitionInfo partitionInfo =
@@ -90,7 +105,7 @@ public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat
       carbonLoadModel.getOutputFilesInfoHolder().addToPartitionPath(finalOutputPath);
       context.getConfiguration().set("carbon.outputformat.writepath", finalOutputPath);
     }
-    CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), carbonLoadModel);
+    CarbonTableOutputFormat.setLoadModel(jc, carbonLoadModel);
     org.apache.hadoop.mapreduce.RecordWriter<NullWritable, ObjectArrayWritable> re =
         super.getRecordWriter(context);
     return new FileSinkOperator.RecordWriter() {
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
index 50be79c..b611edb 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.carbondata.common.Strings;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -61,6 +60,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.log4j.Logger;
 
 public class HiveCarbonUtil {
@@ -68,17 +68,18 @@ public class HiveCarbonUtil {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(HiveCarbonUtil.class.getName());
 
-  public static CarbonLoadModel getCarbonLoadModel(Configuration tableProperties)
-      throws IOException {
+  public static CarbonLoadModel getCarbonLoadModel(Configuration tableProperties) {
     String[] tableUniqueName = tableProperties.get("name").split("\\.");
     String databaseName = tableUniqueName[0];
     String tableName = tableUniqueName[1];
-    String tablePath = tableProperties.get("location");
-    String columns = tableProperties.get("columns");
+    String tablePath = tableProperties.get(hive_metastoreConstants.META_TABLE_LOCATION);
+    String columns = tableProperties.get(hive_metastoreConstants.META_TABLE_COLUMNS);
     String sortColumns = tableProperties.get("sort_columns");
-    String columnTypes = tableProperties.get("columns.types");
-    String partitionColumns = tableProperties.get("partition_columns");
-    String partitionColumnTypes = tableProperties.get("partition_columns.types");
+    String columnTypes = tableProperties.get(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
+    String partitionColumns =
+        tableProperties.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+    String partitionColumnTypes =
+        tableProperties.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
     if (partitionColumns != null) {
       columns = columns + "," + partitionColumns;
       columnTypes = columnTypes + ":" + partitionColumnTypes;
@@ -97,12 +98,12 @@ public class HiveCarbonUtil {
   }
 
   public static CarbonLoadModel getCarbonLoadModel(Properties tableProperties,
-      Configuration configuration) throws IOException {
+      Configuration configuration) {
     String[] tableUniqueName = tableProperties.getProperty("name").split("\\.");
     String databaseName = tableUniqueName[0];
     String tableName = tableUniqueName[1];
-    String tablePath = tableProperties.getProperty("location");
-    String columns = tableProperties.getProperty("columns");
+    String tablePath = tableProperties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+    String columns = tableProperties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
     String sortColumns = tableProperties.getProperty("sort_columns");
     String[] columnTypes = splitSchemaStringToArray(tableProperties.getProperty("columns.types"));
     String complexDelim = tableProperties.getProperty("complex_delimiter", "");
@@ -117,7 +118,7 @@ public class HiveCarbonUtil {
 
   public static CarbonLoadModel getCarbonLoadModel(String tableName, String databaseName,
       String location, String sortColumnsString, String[] columns, String[] columnTypes,
-      Configuration configuration) throws IOException {
+      Configuration configuration) {
     CarbonLoadModel loadModel;
     CarbonTable carbonTable;
     try {
@@ -146,7 +147,7 @@ public class HiveCarbonUtil {
     options.put("fileheader", Strings.mkString(columns, ","));
     try {
       loadModel = carbonLoadModelBuilder.build(options, System.currentTimeMillis(), "");
-    } catch (InvalidLoadOptionException e) {
+    } catch (InvalidLoadOptionException | IOException e) {
       throw new RuntimeException(e);
     }
     loadModel.setSkipParsers();
@@ -201,21 +202,16 @@ public class HiveCarbonUtil {
 
   private static void writeSchemaFile(TableInfo tableInfo) throws IOException {
     ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    CarbonFile schemaFile =
-        FileFactory.getCarbonFile(CarbonTablePath.getSchemaFilePath(tableInfo.getTablePath()));
-    if (!schemaFile.exists()) {
-      if (!schemaFile.getParentFile().mkdirs()) {
-        throw new IOException(
-            "Unable to create directory: " + schemaFile.getParentFile().getAbsolutePath());
-      }
-    }
-    ThriftWriter thriftWriter = new ThriftWriter(schemaFile.getAbsolutePath(), false);
+    String schemaFilePath = CarbonTablePath.getSchemaFilePath(tableInfo.getTablePath());
+    String metadataPath = CarbonTablePath.getMetadataPath(tableInfo.getTablePath());
+    FileFactory.mkdirs(metadataPath);
+    ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
     thriftWriter.open(FileWriteOperation.OVERWRITE);
     thriftWriter.write(schemaConverter
         .fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(),
             tableInfo.getFactTable().getTableName()));
     thriftWriter.close();
-    schemaFile.setLastModifiedTime(System.currentTimeMillis());
+    FileFactory.getCarbonFile(schemaFilePath).setLastModifiedTime(System.currentTimeMillis());
   }
 
   public static HiveMetaHook getMetaHook() {
@@ -312,4 +308,5 @@ public class HiveCarbonUtil {
     }
     return tokens.toArray(new String[tokens.size()]);
   }
-}
\ No newline at end of file
+
+}
diff --git a/integration/hive/src/test/java/org/apache/carbondata/hive/Hive2CarbonExpressionTest.java b/integration/hive/src/test/java/org/apache/carbondata/hive/Hive2CarbonExpressionTest.java
index 6b5a92a..b17e74e 100644
--- a/integration/hive/src/test/java/org/apache/carbondata/hive/Hive2CarbonExpressionTest.java
+++ b/integration/hive/src/test/java/org/apache/carbondata/hive/Hive2CarbonExpressionTest.java
@@ -24,6 +24,8 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.IndexFilter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 import org.apache.carbondata.hadoop.api.CarbonInputFormat;
@@ -77,6 +79,7 @@ public class Hive2CarbonExpressionTest {
       Assert.fail("create table failed: " + e.getMessage());
     }
   }
+
   @Test
   public void testEqualHiveFilter() throws IOException {
     ExprNodeDesc column = new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, "id", null, false);
@@ -351,4 +354,20 @@ public class Hive2CarbonExpressionTest {
     Assert.assertEquals(0, list.size());
 
   }
+
+  @Test
+  public void testFilterOnDate() throws IOException {
+    ExprNodeDesc column =
+        new ExprNodeColumnDesc(TypeInfoFactory.dateTypeInfo, "datee", null, false);
+    ExprNodeDesc constant = new ExprNodeConstantDesc(TypeInfoFactory.dateTypeInfo, "2020-01-01");
+    List<ExprNodeDesc> children = Lists.newArrayList();
+    children.add(column);
+    children.add(constant);
+    ExprNodeGenericFuncDesc node =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.dateTypeInfo, new GenericUDFOPEqual(),
+            children);
+    Expression expression = Hive2CarbonExpression.convertExprHive2Carbon(node);
+    assert (((LiteralExpression) expression.getChildren().get(1)).getLiteralExpValue().toString()
+        .equalsIgnoreCase("2020-01-01"));
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
index 74c3645..dacc309 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.processing.loading;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.exception.NoRetryException;
@@ -68,21 +67,6 @@ public class DataLoadExecutor {
   }
 
   /**
-   * This method will remove any bad record key from the map entry
-   *
-   * @param carbonTableIdentifier
-   * @return
-   */
-  private boolean badRecordFound(CarbonTableIdentifier carbonTableIdentifier) {
-    String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
-    boolean badRecordKeyFound = false;
-    if (null != BadRecordsLogger.hasBadRecord(badRecordLoggerKey)) {
-      badRecordKeyFound = true;
-    }
-    return badRecordKeyFound;
-  }
-
-  /**
    * Method to clean all the resource
    */
   public void close() {