You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/07/26 14:37:11 UTC

carbondata git commit: [CARBONDATA-1323] Presto Optimization for Integration Layer

Repository: carbondata
Updated Branches:
  refs/heads/master 06ddd82f6 -> f2bb8d380


[CARBONDATA-1323] Presto Optimization for Integration Layer

This closes #1190


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f2bb8d38
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f2bb8d38
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f2bb8d38

Branch: refs/heads/master
Commit: f2bb8d3804973e02a45fb4f1ac6cb458590af1e5
Parents: 06ddd82
Author: Bhavya Aggarwal <bh...@knoldus.com>
Authored: Fri Jul 21 13:06:46 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 26 22:36:56 2017 +0800

----------------------------------------------------------------------
 integration/presto/pom.xml                      | 27 +++++++
 .../carbondata/presto/CarbondataPageSource.java | 75 +++++++++++++-----
 .../presto/CarbondataRecordCursor.java          | 83 +++++++++-----------
 .../carbondata/presto/CarbondataRecordSet.java  | 42 +++++-----
 .../presto/CarbonDictionaryDecodeSupport.scala  | 65 +++++++++++++++
 5 files changed, 208 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 25eb6a7..4619413 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -228,6 +228,33 @@
             <skip>true</skip>
           </configuration>
         </plugin>
+        <plugin>
+          <groupId>org.scala-tools</groupId>
+          <artifactId>maven-scala-plugin</artifactId>
+          <version>2.15.2</version>
+          <executions>
+            <execution>
+              <id>compile</id>
+              <goals>
+                <goal>compile</goal>
+              </goals>
+              <phase>compile</phase>
+            </execution>
+            <execution>
+              <id>testCompile</id>
+              <goals>
+                <goal>testCompile</goal>
+              </goals>
+              <phase>test</phase>
+            </execution>
+            <execution>
+              <phase>process-resources</phase>
+              <goals>
+                <goal>compile</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
       </plugins>
     </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 7c50c66..c03983e 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -27,7 +27,10 @@ import com.facebook.presto.spi.Page;
 import com.facebook.presto.spi.PageBuilder;
 import com.facebook.presto.spi.RecordCursor;
 import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.LazyBlock;
+import com.facebook.presto.spi.block.LazyBlockLoader;
 import com.facebook.presto.spi.type.DecimalType;
 import com.facebook.presto.spi.type.Type;
 import io.airlift.slice.Slice;
@@ -51,21 +54,20 @@ public class CarbondataPageSource implements ConnectorPageSource {
   private final PageBuilder pageBuilder;
   private boolean closed;
   private final char[] buffer = new char[100];
+  private Block[] blocks;
 
-  public CarbondataPageSource(RecordSet recordSet)
-  {
+  public CarbondataPageSource(RecordSet recordSet) {
     this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor());
   }
 
-  public CarbondataPageSource(List<Type> types, RecordCursor cursor)
-  {
+  public CarbondataPageSource(List<Type> types, RecordCursor cursor) {
     this.cursor = requireNonNull(cursor, "cursor is null");
     this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null")));
     this.pageBuilder = new PageBuilder(this.types);
+    this.blocks = new Block[types.size()];
   }
 
-  public RecordCursor getCursor()
-  {
+  public RecordCursor getCursor() {
     return cursor;
   }
 
@@ -86,6 +88,9 @@ public class CarbondataPageSource implements ConnectorPageSource {
   }
 
   @Override public Page getNextPage() {
+    BlockBuilder output;
+    Page page;
+    int size = types.size();
     if (!closed) {
       int i;
       for (i = 0; i < ROWS_PER_REQUEST; i++) {
@@ -98,8 +103,9 @@ public class CarbondataPageSource implements ConnectorPageSource {
         }
 
         pageBuilder.declarePosition();
-        for (int column = 0; column < types.size(); column++) {
-          BlockBuilder output = pageBuilder.getBlockBuilder(column);
+
+        for (int column = 0; column < size; column++) {
+          output = pageBuilder.getBlockBuilder(column);
           if (cursor.isNull(column)) {
             output.appendNull();
           } else {
@@ -113,8 +119,7 @@ public class CarbondataPageSource implements ConnectorPageSource {
               type.writeDouble(output, cursor.getDouble(column));
             } else if (javaType == Slice.class) {
               Slice slice = cursor.getSlice(column);
-              if(type instanceof  DecimalType)
-              {
+              if (type instanceof DecimalType) {
                 if (isShortDecimal(type)) {
                   type.writeLong(output, parseLong((DecimalType) type, slice, 0, slice.length()));
                 } else {
@@ -127,6 +132,8 @@ public class CarbondataPageSource implements ConnectorPageSource {
               type.writeObject(output, cursor.getObject(column));
             }
           }
+          blocks[column] = new LazyBlock(output.getPositionCount(),
+              new CarbonBlockLoader(output.build(), types.get(column)));
         }
       }
     }
@@ -135,10 +142,16 @@ public class CarbondataPageSource implements ConnectorPageSource {
     if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) {
       return null;
     }
-    Page page = pageBuilder.build();
+
+    if (blocks != null && blocks.length > 0) {
+      page = new Page(blocks[0].getPositionCount(), blocks);
+    } else {
+      page = pageBuilder.build();
+    }
+
     pageBuilder.reset();
     return page;
- }
+  }
 
   @Override public long getSystemMemoryUsage() {
     return cursor.getSystemMemoryUsage() + pageBuilder.getSizeInBytes();
@@ -150,29 +163,49 @@ public class CarbondataPageSource implements ConnectorPageSource {
 
   }
 
-  private long parseLong(DecimalType type, Slice slice, int offset, int length)
-  {
+  private long parseLong(DecimalType type, Slice slice, int offset, int length) {
     BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
     return decimal.unscaledValue().longValue();
   }
 
-
-  private Slice parseSlice(DecimalType type, Slice slice, int offset, int length)
-  {
+  private Slice parseSlice(DecimalType type, Slice slice, int offset, int length) {
     BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
     return encodeUnscaledValue(decimal.unscaledValue());
   }
 
-  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length)
-  {
+  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length) {
     checkArgument(length < buffer.length);
     for (int i = 0; i < length; i++) {
       buffer[i] = (char) slice.getByte(offset + i);
     }
     BigDecimal decimal = new BigDecimal(buffer, 0, length);
-    checkState(decimal.scale() <= type.getScale(), "Read decimal value scale larger than column scale");
+    checkState(decimal.scale() <= type.getScale(),
+        "Read decimal value scale larger than column scale");
     decimal = decimal.setScale(type.getScale(), HALF_UP);
-    checkState(decimal.precision() <= type.getPrecision(), "Read decimal precision larger than column precision");
+    checkState(decimal.precision() <= type.getPrecision(),
+        "Read decimal precision larger than column precision");
     return decimal;
   }
+
+  /**
+   * Using the LazyBlockLoader
+   */
+  private final class CarbonBlockLoader implements LazyBlockLoader<LazyBlock> {
+    private boolean loaded;
+    private Block dataBlock;
+    private Type type;
+
+    public CarbonBlockLoader(Block dataBlock, Type type) {
+      this.dataBlock = dataBlock;
+      this.type = type;
+    }
+
+    @Override public void load(LazyBlock block) {
+      if (loaded) {
+        return;
+      }
+      block.setBlock(dataBlock);
+      loaded = true;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index 2e97dc0..d6b1422 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -17,29 +17,24 @@
 
 package org.apache.carbondata.presto;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
 import com.facebook.presto.spi.RecordCursor;
 import com.facebook.presto.spi.type.DecimalType;
 import com.facebook.presto.spi.type.Decimals;
 import com.facebook.presto.spi.type.TimestampType;
 import com.facebook.presto.spi.type.Type;
-import com.facebook.presto.spi.block.Block;
-import com.facebook.presto.spi.block.BlockBuilder;
-import com.facebook.presto.spi.block.BlockBuilderStatus;
-
-import com.google.common.base.Strings;
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
-import io.airlift.slice.Slices;
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-
+import scala.Int;
+import scala.Tuple3;
 
 import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
 import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
@@ -55,22 +50,24 @@ public class CarbondataRecordCursor implements RecordCursor {
   private static final Logger log = Logger.get(CarbondataRecordCursor.class);
   private final List<CarbondataColumnHandle> columnHandles;
 
-  private List<String> fields;
+  private Object[] fields;
   private CarbondataSplit split;
   private CarbonIterator<Object[]> rowCursor;
-  private CarbonReadSupport<Object[]> readSupport;
+  private CarbonDictionaryDecodeReaderSupport readSupport;
+  private Tuple3<DataType,Dictionary,Int>[] dictionary;
 
   private long totalBytes;
   private long nanoStart;
   private long nanoEnd;
 
-  public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport,
+  public CarbondataRecordCursor(CarbonDictionaryDecodeReaderSupport readSupport,
       CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles,
-      CarbondataSplit split) {
+      CarbondataSplit split, Tuple3<DataType,Dictionary,Int>[] dictionaries ) {
     this.rowCursor = carbonIterator;
     this.columnHandles = columnHandles;
     this.readSupport = readSupport;
     this.totalBytes = 0;
+    this.dictionary = dictionaries;
   }
 
   @Override public long getTotalBytes() {
@@ -101,20 +98,8 @@ public class CarbondataRecordCursor implements RecordCursor {
     }
 
     if (rowCursor.hasNext()) {
-      Object[] columns = readSupport.readRow(rowCursor.next());
-      fields = new ArrayList<String>();
-      if(columns != null && columns.length > 0)
-      {
-        for(Object value : columns){
-          if(value != null )
-          {
-            fields.add(value.toString());
-          } else {
-            fields.add(null);
-          }
-        }
-      }
-      totalBytes += columns.length;
+       fields = readSupport.readRow(rowCursor.next(),dictionary);
+       totalBytes += fields.length;
       return true;
     }
     return false;
@@ -122,22 +107,30 @@ public class CarbondataRecordCursor implements RecordCursor {
 
   @Override public boolean getBoolean(int field) {
     checkFieldType(field, BOOLEAN);
-    return Boolean.parseBoolean(getFieldValue(field));
+    return (Boolean)getFieldValue(field);
   }
 
   @Override public long getLong(int field) {
-    String timeStr = getFieldValue(field);
+    Object obj = getFieldValue(field);
+    Long timeStr = 0L;
+    if( obj instanceof  Integer ){
+      timeStr = ((Integer)obj).longValue();
+    } else if( obj instanceof Long ) {
+      timeStr = (Long)obj;
+    } else {
+      timeStr = Math.round(Double.parseDouble(obj.toString()));
+    }
     Type actual = getType(field);
     if(actual instanceof TimestampType){
-      return new Timestamp(Long.parseLong(timeStr)).getTime()/1000;
+      return new Timestamp(timeStr).getTime()/1000;
     }
     //suppose the
-    return Math.round(Double.parseDouble(getFieldValue(field)));
+    return timeStr;
   }
 
   @Override public double getDouble(int field) {
     checkFieldType(field, DOUBLE);
-    return Double.parseDouble(getFieldValue(field));
+    return (Double)getFieldValue(field);
   }
 
   @Override public Slice getSlice(int field) {
@@ -150,8 +143,8 @@ public class CarbondataRecordCursor implements RecordCursor {
       } else {
         checkFieldType(field, DecimalType.createDecimalType());
       }
-      String fieldValue = getFieldValue(field);
-      BigDecimal bigDecimalValue = new BigDecimal(fieldValue);
+      Object fieldValue = getFieldValue(field);
+      BigDecimal bigDecimalValue = new BigDecimal(fieldValue.toString());
       if (isShortDecimal(decimalType)) {
         return utf8Slice(Decimals.toString(bigDecimalValue.longValue(), actual.getScale()));
       } else {
@@ -174,7 +167,7 @@ public class CarbondataRecordCursor implements RecordCursor {
       }
     } else {
       checkFieldType(field, VARCHAR);
-      return utf8Slice(getFieldValue(field));
+      return utf8Slice(getFieldValue(field).toString());
     }
   }
 
@@ -184,12 +177,12 @@ public class CarbondataRecordCursor implements RecordCursor {
 
   @Override public boolean isNull(int field) {
     checkArgument(field < columnHandles.size(), "Invalid field index");
-    return Strings.isNullOrEmpty(getFieldValue(field));
+    return getFieldValue(field) == null;
   }
 
-  String getFieldValue(int field) {
+  Object getFieldValue(int field) {
     checkState(fields != null, "Cursor has not been advanced yet");
-    return fields.get(field);
+    return fields[field];
   }
 
   private void checkFieldType(int field, Type expected) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index d75cbfb..661e83f 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -17,9 +17,10 @@
 
 package org.apache.carbondata.presto;
 
-import com.facebook.presto.spi.*;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.facebook.presto.spi.type.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -32,16 +33,20 @@ import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.BatchResult;
 import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
-//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.Type;
+import scala.Tuple3;
 
 import static org.apache.carbondata.presto.Types.checkType;
 
+//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+
 public class CarbondataRecordSet implements RecordSet {
 
   private CarbonTable carbonTable;
@@ -53,7 +58,7 @@ public class CarbondataRecordSet implements RecordSet {
   private List<CarbondataColumnHandle> columns;
   private QueryExecutor queryExecutor;
 
-  private CarbonReadSupport<Object[]> readSupport;
+  private CarbonDictionaryDecodeReaderSupport readSupport;
 
   public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
       ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) {
@@ -63,7 +68,7 @@ public class CarbondataRecordSet implements RecordSet {
     this.rebuildConstraints = this.split.getRebuildConstraints();
     this.queryModel = queryModel;
     this.columns = columns;
-    this.readSupport = new DictionaryDecodeReadSupport();
+    this.readSupport = new CarbonDictionaryDecodeReaderSupport();
   }
 
   //todo support later
@@ -84,24 +89,25 @@ public class CarbondataRecordSet implements RecordSet {
     tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
         split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(),
         split.getLocalInputSplit().getLocations().toArray(new String[0]),
-        split.getLocalInputSplit().getLength(),new BlockletInfos(),
+        split.getLocalInputSplit().getLength(), new BlockletInfos(),
         //blockletInfos,
-        ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()),null));
+        ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()), null));
     queryModel.setTableBlockInfos(tableBlockInfoList);
 
     queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
 
-    //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
     try {
-      readSupport
+
+      Tuple3[] dict = readSupport
           .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
       CarbonIterator<Object[]> carbonIterator =
           new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
-      RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
+      RecordCursor rc =
+          new CarbondataRecordCursor(readSupport, carbonIterator, columns, split, dict);
       return rc;
     } catch (QueryExecutionException e) {
-       throw new RuntimeException(e.getMessage(), e);
-   } catch (Exception ex) {
+      throw new RuntimeException(e.getMessage(), e);
+    } catch (Exception ex) {
       throw new RuntimeException(ex.getMessage(), ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f2bb8d38/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
new file mode 100644
index 0000000..fbdfebd
--- /dev/null
+++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto
+
+import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+
+
+class CarbonDictionaryDecodeReaderSupport[T] {
+
+  def initialize(carbonColumns: Array[CarbonColumn],
+      absoluteTableIdentifier: AbsoluteTableIdentifier): Array[(DataType, Dictionary, Int)] = {
+
+    carbonColumns.zipWithIndex.filter(dictChecker(_)).map { carbonColumnWithIndex =>
+      val (carbonColumn, index) = carbonColumnWithIndex
+      val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+        CacheProvider.getInstance()
+          .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier
+            .getStorePath)
+      val dict: Dictionary = forwardDictionaryCache
+        .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier,
+          carbonColumn.getColumnIdentifier,
+          carbonColumn.getDataType))
+      (carbonColumn.getDataType, dict, index)
+    }
+  }
+
+  private def dictChecker(carbonColumWithIndex: (CarbonColumn, Int)): Boolean = {
+    val (carbonColumn, _) = carbonColumWithIndex
+    if (!carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumn.isComplex &&
+        carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
+      true
+    } else {
+      false
+    }
+  }
+
+  def readRow(data: Array[Object],
+      dictionaries: Array[(DataType, Dictionary, Int)]): Array[Object] = {
+    dictionaries.foreach { (dictionary: (DataType, Dictionary, Int)) =>
+      val (_, dict, position) = dictionary
+      data(position) = dict.getDictionaryValueForKey(data(position).asInstanceOf[Int])
+    }
+    data
+  }
+
+}