You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/12 15:05:21 UTC

[45/50] [abbrv] carbondata git commit: [CARBONDATA-1271] Enhanced Performance for Hive Integration with Carbondata

[CARBONDATA-1271] Enhanced Performance for Hive Integration with Carbondata

This closes #1142


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cbe14197
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cbe14197
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cbe14197

Branch: refs/heads/datamap
Commit: cbe141976a53a558b84d6e31baf3ec54a9bc38cc
Parents: 285ce72
Author: Bhavya <bh...@knoldus.com>
Authored: Thu Jul 6 11:53:03 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 12 17:40:11 2017 +0800

----------------------------------------------------------------------
 .../core/stats/QueryStatisticsRecorderImpl.java |  80 +++---
 .../carbondata/hadoop/CarbonInputFormat.java    |   7 +-
 .../carbondata/hive/CarbonArrayInspector.java   |   4 -
 .../hive/CarbonDictionaryDecodeReadSupport.java | 288 +++++++++++++++++++
 .../carbondata/hive/CarbonHiveInputSplit.java   |  23 +-
 .../carbondata/hive/CarbonHiveRecordReader.java |  67 ++---
 .../apache/carbondata/hive/CarbonHiveSerDe.java |  36 +--
 .../hive/MapredCarbonInputFormat.java           | 129 ++++++---
 .../hive/server/HiveEmbeddedServer2.java        |   1 +
 9 files changed, 477 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
index f84a674..ffb7d7f 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
@@ -101,45 +101,47 @@ public class QueryStatisticsRecorderImpl implements QueryStatisticsRecorder, Ser
     long scannedPages = 0;
     try {
       for (QueryStatistic statistic : queryStatistics) {
-        switch (statistic.getMessage()) {
-          case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
-            load_blocks_time += statistic.getTimeTaken();
-            break;
-          case QueryStatisticsConstants.SCAN_BLOCKlET_TIME:
-            scan_blocks_time += statistic.getCount();
-            break;
-          case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
-            scan_blocks_num += statistic.getCount();
-            break;
-          case QueryStatisticsConstants.LOAD_DICTIONARY:
-            load_dictionary_time += statistic.getTimeTaken();
-            break;
-          case QueryStatisticsConstants.RESULT_SIZE:
-            result_size += statistic.getCount();
-            break;
-          case QueryStatisticsConstants.EXECUTOR_PART:
-            total_executor_time += statistic.getTimeTaken();
-            break;
-          case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
-            total_blocklet = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
-            valid_scan_blocklet = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.VALID_PAGE_SCANNED:
-            valid_pages_blocklet = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.TOTAL_PAGE_SCANNED:
-            total_pages = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.READ_BLOCKlET_TIME:
-            readTime = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.PAGE_SCANNED:
-            scannedPages = statistic.getCount();
-            break;
-          default:
-            break;
+        if (statistic.getMessage() != null) {
+          switch (statistic.getMessage()) {
+            case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
+              load_blocks_time += statistic.getTimeTaken();
+              break;
+            case QueryStatisticsConstants.SCAN_BLOCKlET_TIME:
+              scan_blocks_time += statistic.getCount();
+              break;
+            case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
+              scan_blocks_num += statistic.getCount();
+              break;
+            case QueryStatisticsConstants.LOAD_DICTIONARY:
+              load_dictionary_time += statistic.getTimeTaken();
+              break;
+            case QueryStatisticsConstants.RESULT_SIZE:
+              result_size += statistic.getCount();
+              break;
+            case QueryStatisticsConstants.EXECUTOR_PART:
+              total_executor_time += statistic.getTimeTaken();
+              break;
+            case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
+              total_blocklet = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
+              valid_scan_blocklet = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.VALID_PAGE_SCANNED:
+              valid_pages_blocklet = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.TOTAL_PAGE_SCANNED:
+              total_pages = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.READ_BLOCKlET_TIME:
+              readTime = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.PAGE_SCANNED:
+              scannedPages = statistic.getCount();
+              break;
+            default:
+              break;
+          }
         }
       }
       String headers =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1e69648..16b5d69 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -444,9 +444,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
           }
         }
       }
+
+      // For Hive integration if we have to get the stats we have to fetch hive.query.id
+      String query_id = job.getConfiguration().get("query.id") != null ?
+          job.getConfiguration().get("query.id") :
+          job.getConfiguration().get("hive.query.id");
       statistic
           .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
-      recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+      recorder.recordStatisticsForDriver(statistic, query_id);
       return resultFilterredBlocks;
     } finally {
       // clean up the access count for a segment as soon as its usage is complete so that in

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
index 49e068a..b26c959 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.hive;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -122,9 +121,6 @@ class CarbonArrayInspector implements SettableListObjectInspector {
 
       final Writable[] array = ((ArrayWritable) subObj).get();
       final List<Writable> list = Arrays.asList(array);
-
-      Collections.addAll(list, array);
-
       return list;
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
new file mode 100644
index 0000000..bc66d49
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+
+/**
+ *  This is the class to decode dictionary encoded column data back to its original value.
+ */
+public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
+
+  protected Dictionary[] dictionaries;
+
+  protected DataType[] dataTypes;
+  /**
+   * carbon columns
+   */
+  protected CarbonColumn[] carbonColumns;
+
+  protected Writable[] writableArr;
+
+  /**
+   * This initialization is done inside executor task
+   * for column dictionary involved in decoding.
+   *
+   * @param carbonColumns           column list
+   * @param absoluteTableIdentifier table identifier
+   */
+  @Override public void initialize(CarbonColumn[] carbonColumns,
+      AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
+    this.carbonColumns = carbonColumns;
+    dictionaries = new Dictionary[carbonColumns.length];
+    dataTypes = new DataType[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i]
+          .hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) {
+        CacheProvider cacheProvider = CacheProvider.getInstance();
+        Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
+            .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+        dataTypes[i] = carbonColumns[i].getDataType();
+        dictionaries[i] = forwardDictionaryCache.get(
+            new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
+                carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
+      } else {
+        dataTypes[i] = carbonColumns[i].getDataType();
+      }
+    }
+  }
+
+  @Override public T readRow(Object[] data) {
+    assert (data.length == dictionaries.length);
+    writableArr = new Writable[data.length];
+    for (int i = 0; i < dictionaries.length; i++) {
+      if (dictionaries[i] != null) {
+        data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+      }
+      try {
+        writableArr[i] = createWritableObject(data[i], carbonColumns[i]);
+      } catch (IOException e) {
+        throw new RuntimeException(e.getMessage(), e);
+      }
+    }
+
+    return (T) writableArr;
+  }
+
+  /**
+   * to book keep the dictionary cache or update access count for each
+   * column involved during decode, to facilitate LRU cache policy if memory
+   * threshold is reached
+   */
+  @Override public void close() {
+    if (dictionaries == null) {
+      return;
+    }
+    for (int i = 0; i < dictionaries.length; i++) {
+      CarbonUtil.clearDictionaryCache(dictionaries[i]);
+    }
+  }
+
+  /**
+   * To Create the Writable from the CarbonData data
+   *
+   * @param obj
+   * @param carbonColumn
+   * @return
+   * @throws IOException
+   */
+  private Writable createWritableObject(Object obj, CarbonColumn carbonColumn) throws IOException {
+    DataType dataType = carbonColumn.getDataType();
+    switch (dataType) {
+      case STRUCT:
+        return createStruct(obj, carbonColumn);
+      case ARRAY:
+        return createArray(obj, carbonColumn);
+      default:
+        return createWritablePrimitive(obj, carbonColumn);
+    }
+  }
+
+  /**
+   * Create Array Data for Array Datatype
+   *
+   * @param obj
+   * @param carbonColumn
+   * @return
+   * @throws IOException
+   */
+  private ArrayWritable createArray(Object obj, CarbonColumn carbonColumn) throws IOException {
+    if (obj instanceof GenericArrayData) {
+      Object[] objArray = ((GenericArrayData) obj).array();
+      List<CarbonDimension> childCarbonDimensions = null;
+      CarbonDimension arrayDimension = null;
+      if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) {
+        childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+        arrayDimension = childCarbonDimensions.get(0);
+      }
+      List array = new ArrayList();
+      if (objArray != null) {
+        for (int i = 0; i < objArray.length; i++) {
+          Object curObj = objArray[i];
+          Writable newObj = createWritableObject(curObj, arrayDimension);
+          array.add(newObj);
+        }
+      }
+      if (array.size() > 0) {
+        ArrayWritable subArray = new ArrayWritable(Writable.class,
+            (Writable[]) array.toArray(new Writable[array.size()]));
+        return new ArrayWritable(Writable.class, new Writable[] { subArray });
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Create the Struct data for the Struct Datatype
+   *
+   * @param obj
+   * @param carbonColumn
+   * @return
+   * @throws IOException
+   */
+  private ArrayWritable createStruct(Object obj, CarbonColumn carbonColumn) throws IOException {
+    if (obj instanceof GenericInternalRow) {
+      Object[] objArray = ((GenericInternalRow) obj).values();
+      List<CarbonDimension> childCarbonDimensions = null;
+      if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) {
+        childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+      }
+      Writable[] arr = new Writable[objArray.length];
+      for (int i = 0; i < objArray.length; i++) {
+
+        arr[i] = createWritableObject(objArray[i], childCarbonDimensions.get(i));
+      }
+      return new ArrayWritable(Writable.class, arr);
+    }
+    throw new IOException("DataType not supported in Carbondata");
+  }
+
+  /**
+   * This method will create the Writable Objects for primitives.
+   *
+   * @param obj
+   * @param carbonColumn
+   * @return
+   * @throws IOException
+   */
+  private Writable createWritablePrimitive(Object obj, CarbonColumn carbonColumn)
+      throws IOException {
+    DataType dataType = carbonColumn.getDataType();
+    if (obj == null) {
+      return null;
+    }
+    switch (dataType) {
+      case NULL:
+        return null;
+      case DOUBLE:
+        return new DoubleWritable((double) obj);
+      case INT:
+        return new IntWritable((int) obj);
+      case LONG:
+        return new LongWritable((long) obj);
+      case SHORT:
+        return new ShortWritable((Short) obj);
+      case DATE:
+        return new DateWritable(new Date((Integer) obj));
+      case TIMESTAMP:
+        return new TimestampWritable(new Timestamp((long) obj));
+      case STRING:
+        return new Text(obj.toString());
+      case DECIMAL:
+        return new HiveDecimalWritable(
+            HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
+    }
+    throw new IOException("Unknown primitive : " + dataType.getName());
+  }
+
+  /**
+   * If we need to use the same Writable[] then we can use this method
+   *
+   * @param writable
+   * @param obj
+   * @param carbonColumn
+   * @throws IOException
+   */
+  private void setPrimitive(Writable writable, Object obj, CarbonColumn carbonColumn)
+      throws IOException {
+    DataType dataType = carbonColumn.getDataType();
+    if (obj == null) {
+      writable.write(null);
+    }
+    switch (dataType) {
+      case DOUBLE:
+        ((DoubleWritable) writable).set((double) obj);
+        break;
+      case INT:
+        ((IntWritable) writable).set((int) obj);
+        break;
+      case LONG:
+        ((LongWritable) writable).set((long) obj);
+        break;
+      case SHORT:
+        ((ShortWritable) writable).set((short) obj);
+        break;
+      case DATE:
+        ((DateWritable) writable).set(new Date((Long) obj));
+        break;
+      case TIMESTAMP:
+        ((TimestampWritable) writable).set(new Timestamp((long) obj));
+        break;
+      case STRING:
+        ((Text) writable).set(obj.toString());
+        break;
+      case DECIMAL:
+        ((HiveDecimalWritable) writable)
+            .set(HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
index bfe4d27..b922295 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
@@ -113,8 +113,7 @@ public class CarbonHiveInputSplit extends FileSplit
   }
 
   public static CarbonHiveInputSplit from(String segmentId, FileSplit split,
-      ColumnarFormatVersion version)
-      throws IOException {
+      ColumnarFormatVersion version) throws IOException {
     return new CarbonHiveInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
         split.getLocations(), version);
   }
@@ -151,8 +150,7 @@ public class CarbonHiveInputSplit extends FileSplit
     return segmentId;
   }
 
-  @Override
-  public void readFields(DataInput in) throws IOException {
+  @Override public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     this.segmentId = in.readUTF();
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
@@ -162,10 +160,10 @@ public class CarbonHiveInputSplit extends FileSplit
     for (int i = 0; i < numInvalidSegment; i++) {
       invalidSegments.add(in.readUTF());
     }
+    this.numberOfBlocklets = in.readInt();
   }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
+  @Override public void write(DataOutput out) throws IOException {
     super.write(out);
     out.writeUTF(segmentId);
     out.writeShort(version.number());
@@ -174,6 +172,7 @@ public class CarbonHiveInputSplit extends FileSplit
     for (String invalidSegment : invalidSegments) {
       out.writeUTF(invalidSegment);
     }
+    out.writeInt(numberOfBlocklets);
   }
 
   public List<String> getInvalidSegments() {
@@ -213,8 +212,7 @@ public class CarbonHiveInputSplit extends FileSplit
     return bucketId;
   }
 
-  @Override
-  public int compareTo(Distributable o) {
+  @Override public int compareTo(Distributable o) {
     if (o == null) {
       return -1;
     }
@@ -264,18 +262,15 @@ public class CarbonHiveInputSplit extends FileSplit
     return 0;
   }
 
-  @Override
-  public String getBlockPath() {
+  @Override public String getBlockPath() {
     return getPath().getName();
   }
 
-  @Override
-  public List<Long> getMatchedBlocklets() {
+  @Override public List<Long> getMatchedBlocklets() {
     return null;
   }
 
-  @Override
-  public boolean fullScan() {
+  @Override public boolean fullScan() {
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index e4df02e..2a92185 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -62,6 +62,8 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
 
   private ArrayWritable valueObj = null;
   private CarbonObjectInspector objInspector;
+  private long recordReaderCounter = 0;
+  private int[] columnIds;
 
   public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
       InputSplit inputSplit, JobConf jobConf) throws IOException {
@@ -88,17 +90,12 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
     } catch (QueryExecutionException e) {
       throw new IOException(e.getMessage(), e.getCause());
     }
-    if (valueObj == null) {
-      valueObj =
-          new ArrayWritable(Writable.class, new Writable[queryModel.getProjectionColumns().length]);
-    }
-
     final TypeInfo rowTypeInfo;
     final List<String> columnNames;
     List<TypeInfo> columnTypes;
     // Get column names and sort order
     final String colIds = conf.get("hive.io.file.readcolumn.ids");
-    final String columnNameProperty = conf.get("hive.io.file.readcolumn.names");
+    final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
     final String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
 
     if (columnNameProperty.length() == 0) {
@@ -111,47 +108,39 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
     } else {
       columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
     }
+
+    if (valueObj == null) {
+      valueObj = new ArrayWritable(Writable.class, new Writable[columnTypes.size()]);
+    }
+
     if (!colIds.equals("")) {
       String[] arraySelectedColId = colIds.split(",");
       List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();
-
-      for (String anArrayColId : arraySelectedColId) {
-        reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
+      columnIds = new int[arraySelectedColId.length];
+      int columnId = 0;
+      for (int j = 0; j < arraySelectedColId.length; j++) {
+        columnId = Integer.parseInt(arraySelectedColId[j]);
+        columnIds[j] = columnId;
       }
-      // Create row related objects
-      rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, reqColTypes);
-      this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
-    } else {
-      rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
-      this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
     }
+
+    rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+    this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
   }
 
   @Override public boolean next(Void aVoid, ArrayWritable value) throws IOException {
     if (carbonIterator.hasNext()) {
       Object obj = readSupport.readRow(carbonIterator.next());
-      ArrayWritable tmpValue;
-      try {
-        tmpValue = createArrayWritable(obj);
-      } catch (SerDeException se) {
-        throw new IOException(se.getMessage(), se.getCause());
-      }
-
-      if (value != tmpValue) {
-        final Writable[] arrValue = value.get();
-        final Writable[] arrCurrent = tmpValue.get();
-        if (valueObj != null && arrValue.length == arrCurrent.length) {
-          System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
-        } else {
-          if (arrValue.length != arrCurrent.length) {
-            throw new IOException(
-                "CarbonHiveInput : size of object differs. Value" + " size :  " + arrValue.length
-                    + ", Current Object size : " + arrCurrent.length);
-          } else {
-            throw new IOException("CarbonHiveInput can not support RecordReaders that"
-                + " don't return same key & value & value is null");
-          }
+      recordReaderCounter++;
+      Writable[] objArray = (Writable[]) obj;
+      Writable[] sysArray = new Writable[value.get().length];
+      if (columnIds != null && columnIds.length > 0 && objArray.length == columnIds.length) {
+        for (int i = 0; i < columnIds.length; i++) {
+          sysArray[columnIds[i]] = objArray[i];
         }
+        value.set(sysArray);
+      } else {
+        value.set(objArray);
       }
       return true;
     } else {
@@ -159,10 +148,6 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
     }
   }
 
-  private ArrayWritable createArrayWritable(Object obj) throws SerDeException {
-    return createStruct(obj, objInspector);
-  }
-
   @Override public Void createKey() {
     return null;
   }
@@ -172,7 +157,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
   }
 
   @Override public long getPos() throws IOException {
-    return 0;
+    return recordReaderCounter;
   }
 
   @Override public float getProgress() throws IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index f66f3ed..2980ad3 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -79,11 +79,9 @@ class CarbonHiveSerDe extends AbstractSerDe {
 
     final TypeInfo rowTypeInfo;
     final List<String> columnNames;
-    final List<String> reqColNames;
     final List<TypeInfo> columnTypes;
     // Get column names and sort order
     assert configuration != null;
-    final String colIds = configuration.get("hive.io.file.readcolumn.ids");
 
     final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
     final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
@@ -98,29 +96,17 @@ class CarbonHiveSerDe extends AbstractSerDe {
     } else {
       columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
     }
-    if (colIds != null && !colIds.equals("")) {
-      reqColNames = new ArrayList<String>();
-
-      String[] arraySelectedColId = colIds.split(",");
-      List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();
-      for (String anArrayColId : arraySelectedColId) {
-        reqColNames.add(columnNames.get(Integer.parseInt(anArrayColId)));
-        reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
-      }
-      // Create row related objects
-      rowTypeInfo = TypeInfoFactory.getStructTypeInfo(reqColNames, reqColTypes);
-      this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
-    }
-    else {
-      // Create row related objects
-      rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
-      this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
-
-      // Stats part
-      serializedSize = 0;
-      deserializedSize = 0;
-      status = LAST_OPERATION.UNKNOWN;
-    }
+
+
+
+    // Create row related objects
+    rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+    this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
+
+    // Stats part
+    serializedSize = 0;
+    deserializedSize = 0;
+    status = LAST_OPERATION.UNKNOWN;
   }
 
   @Override public Class<? extends Writable> getSerializedClass() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 7a1c9db..58f25c9 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -17,12 +17,12 @@
 package org.apache.carbondata.hive;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
@@ -31,8 +31,11 @@ import org.apache.carbondata.hadoop.CarbonInputFormat;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.io.ArrayWritable;
@@ -42,9 +45,11 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
 
 public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
     implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination {
+  private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
 
   @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
     org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
@@ -63,47 +68,64 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
   @Override
   public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf,
       Reporter reporter) throws IOException {
-    QueryModel queryModel = getQueryModel(jobConf);
-    CarbonReadSupport<ArrayWritable> readSupport = getReadSupportClass(jobConf);
+    String path = null;
+    if (inputSplit instanceof CarbonHiveInputSplit) {
+      path = ((CarbonHiveInputSplit) inputSplit).getPath().toString();
+    }
+    QueryModel queryModel = getQueryModel(jobConf, path);
+    CarbonReadSupport<ArrayWritable> readSupport = new CarbonDictionaryDecodeReadSupport<>();
     return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf);
   }
 
-  private QueryModel getQueryModel(Configuration configuration) throws IOException {
-    CarbonTable carbonTable = getCarbonTable(configuration);
+  /**
+   * this method will read the schema from the physical file and populate into CARBON_TABLE
+   *
+   * @param configuration
+   * @throws IOException
+   */
+  private static void populateCarbonTable(Configuration configuration, String paths)
+      throws IOException {
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    String validInputPath = null;
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
+    } else {
+      if (paths != null) {
+        for (String inputPath : inputPaths) {
+          if (paths.startsWith(inputPath)) {
+            validInputPath = inputPath;
+            break;
+          }
+        }
+      }
+    }
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        AbsoluteTableIdentifier.fromTablePath(validInputPath);
+    // read the schema file to get the absoluteTableIdentifier having the correct table id
+    // persisted in the schema
+    CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+    setCarbonTable(configuration, carbonTable);
+  }
+
+  private static CarbonTable getCarbonTable(Configuration configuration, String path)
+      throws IOException {
+    populateCarbonTable(configuration, path);
+    // read it from schema file in the store
+    String carbonTableStr = configuration.get(CARBON_TABLE);
+    return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+  }
+
+  private QueryModel getQueryModel(Configuration configuration, String path) throws IOException {
+    CarbonTable carbonTable = getCarbonTable(configuration, path);
     // getting the table absoluteTableIdentifier from the carbonTable
     // to avoid unnecessary deserialization
 
     StringBuilder colNames = new StringBuilder();
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
 
-    // query plan includes projection column
-    String projection = getColumnProjection(configuration);
-    if (projection == null) {
-      projection = configuration.get("hive.io.file.readcolumn.names");
-    }
-    if (projection.equals("")) {
-      List<CarbonDimension> carbonDimensionList = carbonTable.getAllDimensions();
-      List<CarbonMeasure> carbonMeasureList = carbonTable.getAllMeasures();
-
-      for (CarbonDimension aCarbonDimensionList : carbonDimensionList) {
-        colNames = new StringBuilder((colNames + (aCarbonDimensionList.getColName())) + ",");
-      }
-      if (carbonMeasureList.size() < 1) {
-        colNames = new StringBuilder(colNames.substring(0, colNames.lastIndexOf(",")));
-      }
-      for (int index = 0; index < carbonMeasureList.size(); index++) {
-        if (!carbonMeasureList.get(index).getColName().equals("default_dummy_measure")) {
-          if (index == carbonMeasureList.size() - 1) {
-            colNames.append(carbonMeasureList.get(index).getColName());
-          } else {
-            colNames =
-                new StringBuilder((colNames + (carbonMeasureList.get(index).getColName())) + ",");
-          }
-        }
-      }
-      projection = colNames.toString().trim();
-      configuration.set("hive.io.file.readcolumn.names", colNames.toString());
-    }
+    String projection = getProjection(configuration, carbonTable,
+        identifier.getCarbonTableIdentifier().getTableName());
     CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
     QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
     // set the filter to the query model in order to filter blocklet before scan
@@ -115,6 +137,45 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
     return queryModel;
   }
 
+  /**
+   * Return the Projection for the CarbonQuery.
+   *
+   * @param configuration
+   * @param carbonTable
+   * @param tableName
+   * @return
+   */
+  private String getProjection(Configuration configuration, CarbonTable carbonTable,
+      String tableName) {
+    // query plan includes projection column
+    String projection = getColumnProjection(configuration);
+    if (projection == null) {
+      projection = configuration.get("hive.io.file.readcolumn.names");
+    }
+    List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(tableName);
+    List<String> carbonColumnNames = new ArrayList<>();
+    StringBuilder allColumns = new StringBuilder();
+    StringBuilder projectionColumns = new StringBuilder();
+    for (CarbonColumn column : carbonColumns) {
+      carbonColumnNames.add(column.getColName());
+      allColumns.append(column.getColName() + ",");
+    }
+
+    if (!projection.equals("")) {
+      String[] columnNames = projection.split(",");
+      //verify that the columns parsed by Hive exist in the table
+      for (String col : columnNames) {
+        //show columns command will return these data
+        if (carbonColumnNames.contains(col)) {
+          projectionColumns.append(col + ",");
+        }
+      }
+      return projectionColumns.substring(0, projectionColumns.lastIndexOf(","));
+    } else {
+      return allColumns.substring(0, allColumns.lastIndexOf(","));
+    }
+  }
+
   @Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
index d8705f8..ae931fb 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
@@ -130,6 +130,7 @@ public class HiveEmbeddedServer2 {
     conf.set("hive.added.files.path", "");
     conf.set("hive.added.archives.path", "");
     conf.set("fs.default.name", "file:///");
+    conf.set(HiveConf.ConfVars.SUBMITLOCALTASKVIACHILD.varname, "false");
 
     // clear mapred.job.tracker - Hadoop defaults to 'local' if not defined. Hive however expects
     // this to be set to 'local' - if it's not, it does a remote execution (i.e. no child JVM)