You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/07/17 01:56:59 UTC

[13/15] carbondata git commit: [CARBONDATA-1232] Datamap implementation for Blocklet

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
new file mode 100644
index 0000000..defe766
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+
+/**
+ * It is just a normal row to store data. Implementation classes could be safe and unsafe.
+ * TODO move this class a global row and use across loading after DataType is changed class
+ */
+public abstract class DataMapRow {
+
+  protected DataMapSchema[] schemas;
+
+  public DataMapRow(DataMapSchema[] schemas) {
+    this.schemas = schemas;
+  }
+
+  public abstract byte[] getByteArray(int ordinal);
+
+  public abstract DataMapRow getRow(int ordinal);
+
+  public abstract void setRow(DataMapRow row, int ordinal);
+
+  public abstract void setByteArray(byte[] byteArray, int ordinal);
+
+  public abstract int getInt(int ordinal);
+
+  public abstract void setInt(int value, int ordinal);
+
+  public abstract void setByte(byte value, int ordinal);
+
+  public abstract byte getByte(int ordinal);
+
+  public abstract void setShort(short value, int ordinal);
+
+  public abstract short getShort(int ordinal);
+
+  public abstract void setLong(long value, int ordinal);
+
+  public abstract long getLong(int ordinal);
+
+  public abstract void setFloat(float value, int ordinal);
+
+  public abstract float getFloat(int ordinal);
+
+  public abstract void setDouble(double value, int ordinal);
+
+  public abstract double getDouble(int ordinal);
+
+  public int getTotalSizeInBytes() {
+    int len = 0;
+    for (int i = 0; i < schemas.length; i++) {
+      len += getSizeInBytes(i);
+    }
+    return len;
+  }
+
+  public int getSizeInBytes(int ordinal) {
+    switch (schemas[ordinal].getSchemaType()) {
+      case FIXED:
+        return schemas[ordinal].getLength();
+      case VARIABLE:
+        return getByteArray(ordinal).length + 2;
+      case STRUCT:
+        return getRow(ordinal).getTotalSizeInBytes();
+      default:
+        throw new UnsupportedOperationException("wrong type");
+    }
+  }
+
+  public int getColumnCount() {
+    return schemas.length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
new file mode 100644
index 0000000..adec346
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Data map row.
+ */
+public class DataMapRowImpl extends DataMapRow {
+
+  private Object[] data;
+
+  public DataMapRowImpl(DataMapSchema[] schemas) {
+    super(schemas);
+    this.data = new Object[schemas.length];
+  }
+
+  @Override public byte[] getByteArray(int ordinal) {
+    return (byte[]) data[ordinal];
+  }
+
+  @Override public DataMapRow getRow(int ordinal) {
+    return (DataMapRow) data[ordinal];
+  }
+
+  @Override public void setByteArray(byte[] byteArray, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.BYTE_ARRAY);
+    data[ordinal] = byteArray;
+  }
+
+  @Override public int getInt(int ordinal) {
+    return (Integer) data[ordinal];
+  }
+
+  @Override public void setInt(int value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.INT);
+    data[ordinal] = value;
+  }
+
+  @Override public void setByte(byte value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.BYTE);
+    data[ordinal] = value;
+  }
+
+  @Override public byte getByte(int ordinal) {
+    return (Byte) data[ordinal];
+  }
+
+  @Override public void setShort(short value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.SHORT);
+    data[ordinal] = value;
+  }
+
+  @Override public short getShort(int ordinal) {
+    return (Short) data[ordinal];
+  }
+
+  @Override public void setLong(long value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.LONG);
+    data[ordinal] = value;
+  }
+
+  @Override public long getLong(int ordinal) {
+    return (Long) data[ordinal];
+  }
+
+  @Override public void setFloat(float value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.FLOAT);
+    data[ordinal] = value;
+  }
+
+  @Override public float getFloat(int ordinal) {
+    return (Float) data[ordinal];
+  }
+
+  @Override public void setDouble(double value, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.DOUBLE);
+    data[ordinal] = value;
+  }
+
+  @Override public void setRow(DataMapRow row, int ordinal) {
+    assert (schemas[ordinal].getDataType() == DataType.STRUCT);
+    data[ordinal] = row;
+  }
+
+  @Override public double getDouble(int ordinal) {
+    return (Double) data[ordinal];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
new file mode 100644
index 0000000..ef78514
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Unsafe implementation of data map row.
+ */
+public class UnsafeDataMapRow extends DataMapRow {
+
+  private MemoryBlock block;
+
+  private int pointer;
+
+  public UnsafeDataMapRow(DataMapSchema[] schemas, MemoryBlock block, int pointer) {
+    super(schemas);
+    this.block = block;
+    this.pointer = pointer;
+  }
+
+  @Override public byte[] getByteArray(int ordinal) {
+    int length;
+    int position = getPosition(ordinal);
+    switch (schemas[ordinal].getSchemaType()) {
+      case VARIABLE:
+        length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+        position += 2;
+        break;
+      default:
+        length = schemas[ordinal].getLength();
+    }
+    byte[] data = new byte[length];
+    unsafe.copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data,
+        BYTE_ARRAY_OFFSET, data.length);
+    return data;
+  }
+
+  @Override public DataMapRow getRow(int ordinal) {
+    DataMapSchema[] childSchemas =
+        ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
+    return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal));
+  }
+
+  @Override public void setByteArray(byte[] byteArray, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public int getInt(int ordinal) {
+    return unsafe
+        .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setInt(int value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public void setByte(byte value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public byte getByte(int ordinal) {
+    return unsafe
+        .getByte(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setShort(short value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public short getShort(int ordinal) {
+    return unsafe
+        .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setLong(long value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public long getLong(int ordinal) {
+    return unsafe
+        .getLong(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setFloat(float value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public float getFloat(int ordinal) {
+    return unsafe
+        .getFloat(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setDouble(double value, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  @Override public double getDouble(int ordinal) {
+    return unsafe
+        .getDouble(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+  }
+
+  @Override public void setRow(DataMapRow row, int ordinal) {
+    throw new UnsupportedOperationException("Not supported to set on unsafe row");
+  }
+
+  private int getPosition(int ordinal) {
+    int position = 0;
+    for (int i = 0; i < ordinal; i++) {
+      position += getSizeInBytes(i);
+    }
+    return position;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
new file mode 100644
index 0000000..80c68ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * It just have 2 types right now, either fixed or variable.
+ */
+public abstract class DataMapSchema {
+
+  protected DataType dataType;
+
+  public DataMapSchema(DataType dataType) {
+    this.dataType = dataType;
+  }
+
+  /**
+   * Either fixed or variable length.
+   *
+   * @return
+   */
+  public DataType getDataType() {
+    return dataType;
+  }
+
+  /**
+   * Gives length in case of fixed schema other wise returns length
+   *
+   * @return
+   */
+  public abstract int getLength();
+
+  /**
+   * schema type
+   * @return
+   */
+  public abstract DataMapSchemaType getSchemaType();
+
+  /*
+ * It has always fixed length, length cannot be updated later.
+ * Usage examples : all primitive types like short, int etc
+ */
+  public static class FixedDataMapSchema extends DataMapSchema {
+
+    private int length;
+
+    public FixedDataMapSchema(DataType dataType) {
+      super(dataType);
+    }
+
+    public FixedDataMapSchema(DataType dataType, int length) {
+      super(dataType);
+      this.length = length;
+    }
+
+    @Override public int getLength() {
+      if (length == 0) {
+        return dataType.getSizeInBytes();
+      } else {
+        return length;
+      }
+    }
+
+    @Override public DataMapSchemaType getSchemaType() {
+      return DataMapSchemaType.FIXED;
+    }
+  }
+
+  public static class VariableDataMapSchema extends DataMapSchema {
+
+    public VariableDataMapSchema(DataType dataType) {
+      super(dataType);
+    }
+
+    @Override public int getLength() {
+      return dataType.getSizeInBytes();
+    }
+
+    @Override public DataMapSchemaType getSchemaType() {
+      return DataMapSchemaType.VARIABLE;
+    }
+  }
+
+  public static class StructDataMapSchema extends DataMapSchema {
+
+    private DataMapSchema[] childSchemas;
+
+    public StructDataMapSchema(DataType dataType, DataMapSchema[] childSchemas) {
+      super(dataType);
+      this.childSchemas = childSchemas;
+    }
+
+    @Override public int getLength() {
+      return dataType.getSizeInBytes();
+    }
+
+    public DataMapSchema[] getChildSchemas() {
+      return childSchemas;
+    }
+
+    @Override public DataMapSchemaType getSchemaType() {
+      return DataMapSchemaType.STRUCT;
+    }
+  }
+
+  public enum DataMapSchemaType {
+    FIXED, VARIABLE, STRUCT
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
new file mode 100644
index 0000000..9d77010
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+/**
+ * Types of filters of select query
+ */
+public enum FilterType {
+  EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index bfa9d7e..f81f805 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -17,16 +17,22 @@
 
 package org.apache.carbondata.core.metadata.blocklet;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 
+import org.apache.hadoop.io.Writable;
+
 /**
  * class to store the information about the blocklet
  */
-public class BlockletInfo implements Serializable {
+public class BlockletInfo implements Serializable, Writable {
 
   /**
    * serialization id
@@ -189,4 +195,49 @@ public class BlockletInfo implements Serializable {
     this.numberOfPages = numberOfPages;
   }
 
+  @Override public void write(DataOutput output) throws IOException {
+    output.writeLong(dimensionOffset);
+    output.writeLong(measureOffsets);
+    int dsize = dimensionChunkOffsets != null ? dimensionChunkOffsets.size() : 0;
+    output.writeShort(dsize);
+    for (int i = 0; i < dsize; i++) {
+      output.writeLong(dimensionChunkOffsets.get(i));
+    }
+    for (int i = 0; i < dsize; i++) {
+      output.writeInt(dimensionChunksLength.get(i));
+    }
+    int mSize = measureChunkOffsets != null ? measureChunkOffsets.size() : 0;
+    output.writeShort(mSize);
+    for (int i = 0; i < mSize; i++) {
+      output.writeLong(measureChunkOffsets.get(i));
+    }
+    for (int i = 0; i < mSize; i++) {
+      output.writeInt(measureChunksLength.get(i));
+    }
+  }
+
+  @Override public void readFields(DataInput input) throws IOException {
+    dimensionOffset = input.readLong();
+    measureOffsets = input.readLong();
+    short dimensionChunkOffsetsSize = input.readShort();
+    dimensionChunkOffsets = new ArrayList<>(dimensionChunkOffsetsSize);
+    for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+      dimensionChunkOffsets.add(input.readLong());
+    }
+    dimensionChunksLength = new ArrayList<>(dimensionChunkOffsetsSize);
+    for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+      dimensionChunksLength.add(input.readInt());
+    }
+
+    short measureChunkOffsetsSize = input.readShort();
+    measureChunkOffsets = new ArrayList<>(measureChunkOffsetsSize);
+    for (int i = 0; i < measureChunkOffsetsSize; i++) {
+      measureChunkOffsets.add(input.readLong());
+    }
+    measureChunksLength = new ArrayList<>(measureChunkOffsetsSize);
+    for (int i = 0; i < measureChunkOffsetsSize; i++) {
+      measureChunksLength.add(input.readInt());
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
index cd86a07..ae99ed8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.metadata.index;
 
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 
 /**
@@ -45,6 +46,11 @@ public class BlockIndexInfo {
   private BlockletIndex blockletIndex;
 
   /**
+   * to store blocklet info like offsets and lengths of each column.
+   */
+  private BlockletInfo blockletInfo;
+
+  /**
    * Constructor
    *
    * @param numberOfRows  number of rows
@@ -61,6 +67,20 @@ public class BlockIndexInfo {
   }
 
   /**
+   *
+   * @param numberOfRows
+   * @param fileName
+   * @param offset
+   * @param blockletIndex
+   * @param blockletInfo
+   */
+  public BlockIndexInfo(long numberOfRows, String fileName, long offset,
+      BlockletIndex blockletIndex, BlockletInfo blockletInfo) {
+    this(numberOfRows, fileName, offset, blockletIndex);
+    this.blockletInfo = blockletInfo;
+  }
+
+  /**
    * @return the numberOfRows
    */
   public long getNumberOfRows() {
@@ -87,4 +107,11 @@ public class BlockIndexInfo {
   public BlockletIndex getBlockletIndex() {
     return blockletIndex;
   }
+
+  /**
+   * @return BlockletInfo
+   */
+  public BlockletInfo getBlockletInfo() {
+    return blockletInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ff54673..e0ee5bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -41,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -116,23 +119,40 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // so block will be loaded in sorted order this will be required for
     // query execution
     Collections.sort(queryModel.getTableBlockInfos());
-    // get the table blocks
-    CacheProvider cacheProvider = CacheProvider.getInstance();
-    BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
-        (BlockIndexStore) cacheProvider
-            .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
-    // remove the invalid table blocks, block which is deleted or compacted
-    cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
-        queryModel.getAbsoluteTableIdentifier());
-    List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
-        prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
-            queryModel.getAbsoluteTableIdentifier());
-    cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
-    queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
-    queryStatistic
-        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
-    queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
 
+    if (queryModel.getTableBlockInfos().get(0).getDetailInfo() != null) {
+      List<AbstractIndex> indexList = new ArrayList<>();
+      Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
+      for (TableBlockInfo blockInfo: queryModel.getTableBlockInfos()) {
+        List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
+        if (tableBlockInfos == null) {
+          tableBlockInfos = new ArrayList<>();
+          listMap.put(blockInfo.getFilePath(), tableBlockInfos);
+        }
+        tableBlockInfos.add(blockInfo);
+      }
+      for (List<TableBlockInfo> tableBlockInfos: listMap.values()) {
+        indexList.add(new IndexWrapper(tableBlockInfos));
+      }
+      queryProperties.dataBlocks = indexList;
+    } else {
+      // get the table blocks
+      CacheProvider cacheProvider = CacheProvider.getInstance();
+      BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
+          (BlockIndexStore) cacheProvider
+              .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
+      // remove the invalid table blocks, block which is deleted or compacted
+      cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
+          queryModel.getAbsoluteTableIdentifier());
+      List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
+          prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
+              queryModel.getAbsoluteTableIdentifier());
+      cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
+      queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
+      queryStatistic
+          .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
+      queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+    }
     // calculating the total number of aggeragted columns
     int aggTypeCount = queryModel.getQueryMeasures().size();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 8704496..a874835 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -156,7 +156,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
     int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
 
-    boolean isScanRequired =
+    boolean isScanRequired =  blockIndex >= blkMaxVal.length ||
         isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index 6823531..c2e077e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -287,7 +287,7 @@ public class RangeValueFilterExecuterImpl extends ValueBasedFilterExecuterImpl {
     BitSet bitSet = new BitSet(1);
     byte[][] filterValues = this.filterRangesValues;
     int columnIndex = this.dimColEvaluatorInfo.getColumnIndex();
-    boolean isScanRequired =
+    boolean isScanRequired = columnIndex >= blockMinValue.length ||
         isScanRequired(blockMinValue[columnIndex], blockMaxValue[columnIndex], filterValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index be82be7..73352cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -79,7 +79,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
-    boolean isScanRequired =
+    boolean isScanRequired =  dimensionBlocksIndex[0] >= blockMaxValue.length ||
         isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 53da6c5..6e8e188 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
-    boolean isScanRequired =
+    boolean isScanRequired =  dimensionBlocksIndex[0] >= blockMaxValue.length ||
         isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index d694960..d6f7c86 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
-    boolean isScanRequired =
+    boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
         isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index b3dd921..597ba52 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -82,7 +82,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
     BitSet bitSet = new BitSet(1);
-    boolean isScanRequired =
+    boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
         isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
     if (isScanRequired) {
       bitSet.set(0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index fdb5483..ff4f5dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -165,6 +165,9 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
         new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
             blockExecutionInfo.getTotalNumberOfMeasureBlock(), fileReader);
     blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+    if (blocksChunkHolder.getDataBlock().getColumnsMaxValue() == null) {
+      return blocksChunkHolder;
+    }
     if (blockletScanner.isScanRequired(blocksChunkHolder)) {
       return blocksChunkHolder;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 92e9594..95030d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
 import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -127,20 +128,27 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
         // set the deleted row to block execution info
         blockInfo.setDeletedRecordsMap(deletedRowsMap);
       }
-      DataRefNode startDataBlock = finder
-          .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
-      while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
-        startDataBlock = startDataBlock.getNextDataRefNode();
-      }
-      long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
-      //if number of block is less than 0 then take end block.
-      if (numberOfBlockToScan <= 0) {
-        DataRefNode endDataBlock = finder
-            .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
-        numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+      DataRefNode dataRefNode = blockInfo.getDataBlock().getDataRefNode();
+      if (dataRefNode instanceof BlockletDataRefNodeWrapper) {
+        BlockletDataRefNodeWrapper wrapper = (BlockletDataRefNodeWrapper) dataRefNode;
+        blockInfo.setFirstDataBlock(wrapper);
+        blockInfo.setNumberOfBlockToScan(wrapper.numberOfNodes());
+
+      } else {
+        DataRefNode startDataBlock =
+            finder.findFirstDataBlock(dataRefNode, blockInfo.getStartKey());
+        while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
+          startDataBlock = startDataBlock.getNextDataRefNode();
+        }
+        long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
+        //if number of block is less than 0 then take end block.
+        if (numberOfBlockToScan <= 0) {
+          DataRefNode endDataBlock = finder.findLastDataBlock(dataRefNode, blockInfo.getEndKey());
+          numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+        }
+        blockInfo.setFirstDataBlock(startDataBlock);
+        blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
       }
-      blockInfo.setFirstDataBlock(startDataBlock);
-      blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 97b1a1f..34c7709 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -122,6 +122,57 @@ public abstract class AbstractDataFileFooterConverter {
   }
 
   /**
+   * Below method will be used to get the index info from index file
+   *
+   * @param filePath           file path of the index file
+   * @return list of index info
+   * @throws IOException problem while reading the index file
+   */
+  public List<DataFileFooter> getIndexInfo(String filePath) throws IOException {
+    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+    List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
+    String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
+    try {
+      // open the reader
+      indexReader.openThriftReader(filePath);
+      // get the index header
+      org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
+      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+      List<org.apache.carbondata.format.ColumnSchema> table_columns =
+          readIndexHeader.getTable_columns();
+      for (int i = 0; i < table_columns.size(); i++) {
+        columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+      }
+      // get the segment info
+      SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
+      BlockletIndex blockletIndex = null;
+      DataFileFooter dataFileFooter = null;
+      // read the block info from file
+      while (indexReader.hasNext()) {
+        BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
+        blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
+        dataFileFooter = new DataFileFooter();
+        TableBlockInfo tableBlockInfo = new TableBlockInfo();
+        tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
+        tableBlockInfo.setVersion(
+            ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion()));
+        int blockletSize = getBlockletSize(readBlockIndexInfo);
+        tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
+        tableBlockInfo.setFilePath(parentPath + "/" + readBlockIndexInfo.file_name);
+        dataFileFooter.setBlockletIndex(blockletIndex);
+        dataFileFooter.setColumnInTable(columnSchemaList);
+        dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
+        dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
+        dataFileFooter.setSegmentInfo(segmentInfo);
+        dataFileFooters.add(dataFileFooter);
+      }
+    } finally {
+      indexReader.closeThriftReader();
+    }
+    return dataFileFooters;
+  }
+
+  /**
    * the methods returns the number of blocklets in a block
    *
    * @param readBlockIndexInfo
@@ -148,6 +199,8 @@ public abstract class AbstractDataFileFooterConverter {
   public abstract DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
       throws IOException;
 
+  public abstract List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException;
+
   /**
    * Below method will be used to get blocklet index for data file meta
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b9c164a..f62f3d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -54,10 +54,13 @@ import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
@@ -926,10 +929,26 @@ public final class CarbonUtil {
    * Below method will be used to read the data file matadata
    */
   public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) throws IOException {
-    AbstractDataFileFooterConverter fileFooterConverter =
-        DataFileFooterConverterFactory.getInstance()
-            .getDataFileFooterConverter(tableBlockInfo.getVersion());
-    return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+    BlockletDetailInfo detailInfo = tableBlockInfo.getDetailInfo();
+    if (detailInfo == null) {
+      AbstractDataFileFooterConverter fileFooterConverter =
+          DataFileFooterConverterFactory.getInstance()
+              .getDataFileFooterConverter(tableBlockInfo.getVersion());
+      return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+    } else {
+      DataFileFooter fileFooter = new DataFileFooter();
+      fileFooter.setSchemaUpdatedTimeStamp(detailInfo.getSchemaUpdatedTimeStamp());
+      ColumnarFormatVersion version =
+          ColumnarFormatVersion.valueOf(detailInfo.getVersionNumber());
+      AbstractDataFileFooterConverter dataFileFooterConverter =
+          DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version);
+      fileFooter.setColumnInTable(dataFileFooterConverter.getSchema(tableBlockInfo));
+      SegmentInfo segmentInfo = new SegmentInfo();
+      segmentInfo.setColumnCardinality(detailInfo.getDimLens());
+      segmentInfo.setNumberOfColumns(detailInfo.getRowCount());
+      fileFooter.setSegmentInfo(segmentInfo);
+      return fileFooter;
+    }
   }
 
   /**
@@ -1567,24 +1586,23 @@ public final class CarbonUtil {
   }
 
   /**
-   * @param tableInfo
    * @param invalidBlockVOForSegmentId
    * @param updateStatusMngr
    * @return
    */
-  public static boolean isInvalidTableBlock(TableBlockInfo tableInfo,
+  public static boolean isInvalidTableBlock(String segmentId, String filePath,
       UpdateVO invalidBlockVOForSegmentId, SegmentUpdateStatusManager updateStatusMngr) {
 
-    if (!updateStatusMngr.isBlockValid(tableInfo.getSegmentId(),
-        CarbonTablePath.getCarbonDataFileName(tableInfo.getFilePath()) + CarbonTablePath
+    if (!updateStatusMngr.isBlockValid(segmentId,
+        CarbonTablePath.getCarbonDataFileName(filePath) + CarbonTablePath
             .getCarbonDataExtension())) {
       return true;
     }
 
     if (null != invalidBlockVOForSegmentId) {
-      Long blockTimeStamp = Long.parseLong(tableInfo.getFilePath()
-          .substring(tableInfo.getFilePath().lastIndexOf('-') + 1,
-              tableInfo.getFilePath().lastIndexOf('.')));
+      Long blockTimeStamp = Long.parseLong(filePath
+          .substring(filePath.lastIndexOf('-') + 1,
+              filePath.lastIndexOf('.')));
       if ((blockTimeStamp > invalidBlockVOForSegmentId.getFactTimestamp() && (
           invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp() != null
               && blockTimeStamp < invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 0f82b95..3ac6987 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -121,4 +121,8 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
     blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
     return blockletInfo;
   }
+
+  @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index 4882b0f..8cd437f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -140,4 +140,7 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
     return numberOfDimensionColumns;
   }
 
+  @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index 143c1b1..ccb8b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -85,6 +85,17 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
     return dataFileFooter;
   }
 
+  @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+    CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(tableBlockInfo.getFilePath());
+    FileHeader fileHeader = carbonHeaderReader.readHeader();
+    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+    List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
+    for (int i = 0; i < table_columns.size(); i++) {
+      columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+    }
+    return columnSchemaList;
+  }
+
   /**
    * Below method is to convert the blocklet info of the thrift to wrapper
    * blocklet info

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/format/src/main/thrift/carbondata_index.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift
index c055031..4df085a 100644
--- a/format/src/main/thrift/carbondata_index.thrift
+++ b/format/src/main/thrift/carbondata_index.thrift
@@ -41,4 +41,5 @@ struct BlockIndex{
   2: required string file_name; // Block file name
   3: required i64 offset; // Offset of the footer
   4: required carbondata.BlockletIndex block_index;	// Blocklet index
+  5: optional carbondata.BlockletInfo3 blocklet_info;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1673193..81226a2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -21,7 +21,14 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.DataRefNode;
@@ -375,8 +382,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         if (isIUDTable) {
           // In case IUD is not performed in this table avoid searching for
           // invalidated blocks.
-          if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
-              updateStatusManager)) {
+          if (CarbonUtil
+              .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+                  invalidBlockVOForSegmentId, updateStatusManager)) {
             continue;
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 631bc2c..56bade7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -77,6 +78,8 @@ public class CarbonInputSplit extends FileSplit
    */
   private String[] deleteDeltaFiles;
 
+  private BlockletDetailInfo detailInfo;
+
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
@@ -138,10 +141,12 @@ public class CarbonInputSplit extends FileSplit
       BlockletInfos blockletInfos =
           new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
       try {
-        tableBlockInfoList.add(
+        TableBlockInfo blockInfo =
             new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
                 split.getLocations(), split.getLength(), blockletInfos, split.getVersion(),
-                split.getDeleteDeltaFiles()));
+                split.getDeleteDeltaFiles());
+        blockInfo.setDetailInfo(split.getDetailInfo());
+        tableBlockInfoList.add(blockInfo);
       } catch (IOException e) {
         throw new RuntimeException("fail to get location of split: " + split, e);
       }
@@ -153,9 +158,12 @@ public class CarbonInputSplit extends FileSplit
     BlockletInfos blockletInfos =
         new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
     try {
-      return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
-          inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
-          blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+      TableBlockInfo blockInfo =
+          new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
+              inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
+              blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+      blockInfo.setDetailInfo(inputSplit.getDetailInfo());
+      return blockInfo;
     } catch (IOException e) {
       throw new RuntimeException("fail to get location of split: " + inputSplit, e);
     }
@@ -180,6 +188,11 @@ public class CarbonInputSplit extends FileSplit
     for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
       deleteDeltaFiles[i] = in.readUTF();
     }
+    boolean detailInfoExists = in.readBoolean();
+    if (detailInfoExists) {
+      detailInfo = new BlockletDetailInfo();
+      detailInfo.readFields(in);
+    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
@@ -197,6 +210,10 @@ public class CarbonInputSplit extends FileSplit
         out.writeUTF(deleteDeltaFiles[i]);
       }
     }
+    out.writeBoolean(detailInfo != null);
+    if (detailInfo != null) {
+      detailInfo.write(out);
+    }
   }
 
   public List<String> getInvalidSegments() {
@@ -310,4 +327,16 @@ public class CarbonInputSplit extends FileSplit
   public String[] getDeleteDeltaFiles() {
     return deleteDeltaFiles;
   }
+
+  public void setDeleteDeltaFiles(String[] deleteDeltaFiles) {
+    this.deleteDeltaFiles = deleteDeltaFiles;
+  }
+
+  public BlockletDetailInfo getDetailInfo() {
+    return detailInfo;
+  }
+
+  public void setDetailInfo(BlockletDetailInfo detailInfo) {
+    this.detailInfo = detailInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ae9c676..e73c04a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -18,152 +18,556 @@
 package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.DataMapStoreManager;
+import org.apache.carbondata.core.indexstore.DataMapType;
+import org.apache.carbondata.core.indexstore.TableDataMap;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.mutate.data.BlockMappingVO;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.internal.CarbonInputSplit;
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManager;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManagerFactory;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Input format of CarbonData file.
+ *
  * @param <T>
  */
 public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
 
+  // comma separated list of input segment numbers
+  public static final String INPUT_SEGMENT_NUMBERS =
+      "mapreduce.input.carboninputformat.segmentnumbers";
+  // comma separated list of input files
+  public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
+  private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
   private static final String FILTER_PREDICATE =
       "mapreduce.input.carboninputformat.filter.predicate";
+  private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+  private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+  private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
 
-  private SegmentManager segmentManager;
+  /**
+   * It is optional, if user does not set then it reads from store
+   *
+   * @param configuration
+   * @param carbonTable
+   * @throws IOException
+   */
+  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+      throws IOException {
+    if (null != carbonTable) {
+      configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+    }
+  }
 
-  public CarbonTableInputFormat() {
-    this.segmentManager = SegmentManagerFactory.getGlobalSegmentManager();
+  public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
+    String carbonTableStr = configuration.get(CARBON_TABLE);
+    if (carbonTableStr == null) {
+      populateCarbonTable(configuration);
+      // read it from schema file in the store
+      carbonTableStr = configuration.get(CARBON_TABLE);
+      return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+    }
+    return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
   }
 
-  @Override
-  public RecordReader<Void, T> createRecordReader(InputSplit split,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    switch (((CarbonInputSplit)split).formatType()) {
-      case COLUMNAR:
-        // TODO: create record reader for columnar format
-        break;
-      default:
-        throw new RuntimeException("Unsupported format type");
+  /**
+   * this method will read the schema from the physical file and populate into CARBON_TABLE
+   *
+   * @param configuration
+   * @throws IOException
+   */
+  private static void populateCarbonTable(Configuration configuration) throws IOException {
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
     }
-    return null;
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
+    // read the schema file to get the absoluteTableIdentifier having the correct table id
+    // persisted in the schema
+    CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+    setCarbonTable(configuration, carbonTable);
   }
 
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
+  public static void setTablePath(Configuration configuration, String tablePath)
+      throws IOException {
+    configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+  }
 
-    // work as following steps:
-    // get all current valid segment
-    // for each segment, get all input split
+  /**
+   * It sets unresolved filter expression.
+   *
+   * @param configuration
+   * @param filterExpression
+   */
+  public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+    if (filterExpression == null) {
+      return;
+    }
+    try {
+      String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+      configuration.set(FILTER_PREDICATE, filterString);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while setting filter expression to Job", e);
+    }
+  }
 
-    List<InputSplit> output = new LinkedList<>();
-    Expression filter = getFilter(job.getConfiguration());
-    Segment[] segments = segmentManager.getAllValidSegments();
-    FilterResolverIntf filterResolver = CarbonInputFormatUtil.resolveFilter(filter, null);
-    for (Segment segment: segments) {
-      List<InputSplit> splits = segment.getSplits(job, filterResolver);
-      output.addAll(splits);
+  public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+    if (projection == null || projection.isEmpty()) {
+      return;
+    }
+    String[] allColumns = projection.getAllColumns();
+    StringBuilder builder = new StringBuilder();
+    for (String column : allColumns) {
+      builder.append(column).append(",");
     }
-    return output;
+    String columnString = builder.toString();
+    columnString = columnString.substring(0, columnString.length() - 1);
+    configuration.set(COLUMN_PROJECTION, columnString);
   }
 
-  /**
-   * set the table path into configuration
-   * @param conf configuration of the job
-   * @param tablePath table path string
-   */
-  public void setTablePath(Configuration conf, String tablePath) {
+  public static String getColumnProjection(Configuration configuration) {
+    return configuration.get(COLUMN_PROJECTION);
+  }
+
+  public static void setCarbonReadSupport(Configuration configuration,
+      Class<? extends CarbonReadSupport> readSupportClass) {
+    if (readSupportClass != null) {
+      configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
+    }
+  }
 
+  private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) {
+    return CarbonStorePath.getCarbonTablePath(absIdentifier);
   }
 
   /**
-   * return the table path in the configuration
-   * @param conf configuration of the job
-   * @return table path string
+   * Set list of segments to access
    */
-  public String getTablePath(Configuration conf) {
-    return null;
+  public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
+    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
   }
 
   /**
-   * set projection columns into configuration
-   * @param conf configuration of the job
-   * @param projection projection
+   * Set list of files to access
    */
-  public void setProjection(Configuration conf, CarbonProjection projection) {
+  public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
+    configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
+  }
 
+  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+      throws IOException {
+    return getCarbonTable(configuration).getAbsoluteTableIdentifier();
   }
 
   /**
-   * return the projection in the configuration
-   * @param conf configuration of the job
-   * @return projection
+   * {@inheritDoc}
+   * Configurations FileInputFormat.INPUT_DIR
+   * are used to get table path to read.
+   *
+   * @param job
+   * @return List<InputSplit> list of CarbonInputSplit
+   * @throws IOException
    */
-  public CarbonProjection getProjection(Configuration conf) {
-    return null;
+  @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
+    AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+    TableDataMap blockletMap =
+        DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+    List<String> invalidSegments = new ArrayList<>();
+    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+    List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
+    // get all valid segments and set them into the configuration
+    if (validSegments.size() == 0) {
+      SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+      SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+          segmentStatusManager.getValidAndInvalidSegments();
+      SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+      validSegments = segments.getValidSegments();
+      if (validSegments.size() == 0) {
+        return new ArrayList<>(0);
+      }
+
+      // remove entry in the segment index if there are invalid segments
+      invalidSegments.addAll(segments.getInvalidSegments());
+      for (String invalidSegmentId : invalidSegments) {
+        invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+      }
+      if (invalidSegments.size() > 0) {
+        List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+            new ArrayList<>(invalidSegments.size());
+        for (String segId : invalidSegments) {
+          invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId));
+        }
+        blockletMap.clear(invalidSegments);
+      }
+    }
+
+    // process and resolve the expression
+    Expression filter = getFilterPredicates(job.getConfiguration());
+    CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+    // this will be null in case of corrupt schema file.
+    if (null == carbonTable) {
+      throw new IOException("Missing/Corrupt schema file for table.");
+    }
+
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+
+    // prune partitions for filter query on partition table
+    BitSet matchedPartitions = null;
+    if (null != filter) {
+      PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+      if (null != partitionInfo) {
+        Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
+        matchedPartitions = new FilterExpressionProcessor()
+            .getFilteredPartitions(filter, partitionInfo, partitioner);
+        if (matchedPartitions.cardinality() == 0) {
+          // no partition is required
+          return new ArrayList<InputSplit>();
+        }
+        if (matchedPartitions.cardinality() == partitioner.numPartitions()) {
+          // all partitions are required, no need to prune partitions
+          matchedPartitions = null;
+        }
+      }
+    }
+
+    FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+
+    // do block filtering and get split
+    List<InputSplit> splits = getSplits(job, filterInterface, validSegments, matchedPartitions);
+    // pass the invalid segment to task side in order to remove index entry in task side
+    if (invalidSegments.size() > 0) {
+      for (InputSplit split : splits) {
+        ((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+        ((org.apache.carbondata.hadoop.CarbonInputSplit) split)
+            .setInvalidTimestampRange(invalidTimestampsList);
+      }
+    }
+    return splits;
   }
 
   /**
-   * set filter expression into the configuration
-   * @param conf configuration of the job
-   * @param filter filter expression
+   * {@inheritDoc}
+   * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS
+   * are used to get table path to read.
+   *
+   * @return
+   * @throws IOException
    */
-  public void setFilter(Configuration conf, Expression filter) {
+  private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+      List<String> validSegments, BitSet matchedPartitions) throws IOException {
+
+    List<InputSplit> result = new LinkedList<InputSplit>();
+    UpdateVO invalidBlockVOForSegmentId = null;
+    Boolean isIUDTable = false;
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+    isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+
+    //for each segment fetch blocks matching filter in Driver BTree
+    List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
+        getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+            validSegments);
+    for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
+
+      // Get the UpdateVO for those tables on which IUD operations being performed.
+      if (isIUDTable) {
+        invalidBlockVOForSegmentId =
+            updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+      }
+      if (isIUDTable) {
+        // In case IUD is not performed in this table avoid searching for
+        // invalidated blocks.
+        if (CarbonUtil
+            .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+                invalidBlockVOForSegmentId, updateStatusManager)) {
+          continue;
+        }
+      }
+      String[] deleteDeltaFilePath = null;
+      try {
+        deleteDeltaFilePath =
+            updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString());
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
+      result.add(inputSplit);
+    }
+    return result;
+  }
+
+  protected Expression getFilterPredicates(Configuration configuration) {
     try {
-      String filterString = ObjectSerializationUtil.convertObjectToString(filter);
-      conf.set(FILTER_PREDICATE, filterString);
-    } catch (Exception e) {
-      throw new RuntimeException("Error while setting filter expression to Job", e);
+      String filterExprString = configuration.get(FILTER_PREDICATE);
+      if (filterExprString == null) {
+        return null;
+      }
+      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+      return (Expression) filter;
+    } catch (IOException e) {
+      throw new RuntimeException("Error while reading filter expression", e);
     }
   }
 
   /**
-   * return filter expression in the configuration
-   * @param conf configuration of the job
-   * @return filter expression
+   * get data blocks of given segment
    */
-  public Expression getFilter(Configuration conf) {
-    Object filter;
-    String filterExprString = conf.get(FILTER_PREDICATE);
-    if (filterExprString == null) {
-      return null;
+  private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+      AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+      BitSet matchedPartitions, List<String> segmentIds) throws IOException {
+
+    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+    QueryStatistic statistic = new QueryStatistic();
+
+    // get tokens for all the required FileSystem for table path
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+
+    TableDataMap blockletMap = DataMapStoreManager.getInstance()
+        .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET);
+    List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
+
+    List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+    for (Blocklet blocklet : prunedBlocklets) {
+      int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+          CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString()));
+
+      // matchedPartitions variable will be null in two cases as follows
+      // 1. the table is not a partition table
+      // 2. the table is a partition table, and all partitions are matched by query
+      // for partition table, the task id of carbaondata file name is the partition id.
+      // if this partition is not required, here will skip it.
+      if (matchedPartitions == null || matchedPartitions.get(taskId)) {
+        resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet));
+      }
     }
+    statistic
+        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+    recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+    return resultFilterredBlocks;
+  }
+
+  private org.apache.carbondata.hadoop.CarbonInputSplit convertToCarbonInputSplit(Blocklet blocklet)
+      throws IOException {
+    blocklet.updateLocations();
+    org.apache.carbondata.hadoop.CarbonInputSplit split =
+        org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
+            new FileSplit(blocklet.getPath(), 0, blocklet.getLength(), blocklet.getLocations()),
+            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
+    split.setDetailInfo(blocklet.getDetailInfo());
+    return split;
+  }
+
+  @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
+    CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+    return new CarbonRecordReader<T>(queryModel, readSupport);
+  }
+
+  public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    CarbonTable carbonTable = getCarbonTable(configuration);
+    // getting the table absoluteTableIdentifier from the carbonTable
+    // to avoid unnecessary deserialization
+    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+
+    // query plan includes projection column
+    String projection = getColumnProjection(configuration);
+    CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
+    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+
+    // set the filter to the query model in order to filter blocklet before scan
+    Expression filter = getFilterPredicates(configuration);
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+    FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+    queryModel.setFilterExpressionResolverTree(filterIntf);
+
+    // update the file level index store if there are invalid segment
+    if (inputSplit instanceof CarbonMultiBlockSplit) {
+      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+      List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+      if (invalidSegments.size() > 0) {
+        queryModel.setInvalidSegmentIds(invalidSegments);
+      }
+      List<UpdateVO> invalidTimestampRangeList =
+          split.getAllSplits().get(0).getInvalidTimestampRange();
+      if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+        queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+      }
+    }
+    return queryModel;
+  }
+
+  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+    //By default it uses dictionary decoder read class
+    CarbonReadSupport<T> readSupport = null;
+    if (readSupportClass != null) {
+      try {
+        Class<?> myClass = Class.forName(readSupportClass);
+        Constructor<?> constructor = myClass.getConstructors()[0];
+        Object object = constructor.newInstance();
+        if (object instanceof CarbonReadSupport) {
+          readSupport = (CarbonReadSupport) object;
+        }
+      } catch (ClassNotFoundException ex) {
+        LOG.error("Class " + readSupportClass + "not found", ex);
+      } catch (Exception ex) {
+        LOG.error("Error while creating " + readSupportClass, ex);
+      }
+    } else {
+      readSupport = new DictionaryDecodeReadSupport<>();
+    }
+    return readSupport;
+  }
+
+  @Override protected boolean isSplitable(JobContext context, Path filename) {
     try {
-      filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
-    } catch (IOException e) {
-      throw new RuntimeException("Error while reading filter expression", e);
+      // Don't split the file if it is local file system
+      FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+      if (fileSystem instanceof LocalFileSystem) {
+        return false;
+      }
+    } catch (Exception e) {
+      return true;
+    }
+    return true;
+  }
+
+  /**
+   * required to be moved to core
+   *
+   * @return updateExtension
+   */
+  private String getUpdateExtension() {
+    // TODO: required to modify when supporting update, mostly will be update timestamp
+    return "update";
+  }
+
+  /**
+   * return valid segment to access
+   */
+  private String[] getSegmentsToAccess(JobContext job) {
+    String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+    if (segmentString.trim().isEmpty()) {
+      return new String[0];
     }
-    assert (filter instanceof Expression);
-    return (Expression) filter;
+    return segmentString.split(",");
   }
 
   /**
-   * Optional API. It can be used by query optimizer to select index based on filter
-   * in the configuration of the job. After selecting index internally, index' name will be set
-   * in the configuration.
+   * Get the row count of the Block and mapping of segment and Block count.
    *
-   * The process of selection is simple, just use the default index. Subclass can provide a more
-   * advanced selection logic like cost based.
-   * @param conf job configuration
+   * @param job
+   * @param identifier
+   * @return
+   * @throws IOException
+   * @throws KeyGenException
    */
-  public void selectIndex(Configuration conf) {
-    // set the default index in configuration
+  public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
+      throws IOException, KeyGenException {
+    TableDataMap blockletMap =
+        DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
+        new SegmentStatusManager(identifier).getValidAndInvalidSegments();
+    Map<String, Long> blockRowCountMapping =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    Map<String, Long> segmentAndBlockCountMapping =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<Blocklet> blocklets = blockletMap.prune(validAndInvalidSegments.getValidSegments(), null);
+    for (Blocklet blocklet : blocklets) {
+      String blockName = blocklet.getPath().toString();
+      blockName = CarbonTablePath.getCarbonDataFileName(blockName);
+      blockName = blockName + CarbonTablePath.getCarbonDataExtension();
+
+      long rowCount = blocklet.getDetailInfo().getRowCount();
+
+      String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName);
+
+      // if block is invalid then dont add the count
+      SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
+
+      if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+        Long blockCount = blockRowCountMapping.get(key);
+        if (blockCount == null) {
+          blockCount = 0L;
+          Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId());
+          if (count == null) {
+            count = 0L;
+          }
+          segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1);
+        }
+        blockCount += rowCount;
+        blockRowCountMapping.put(key, blockCount);
+      }
+    }
+    return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 8270304..8269757 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.core.scan.model.QueryDimension;
 import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.hadoop.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -77,9 +77,10 @@ public class CarbonInputFormatUtil {
     return plan;
   }
 
-  public static <V> CarbonInputFormat<V> createCarbonInputFormat(AbsoluteTableIdentifier identifier,
+  public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
+      AbsoluteTableIdentifier identifier,
       Job job) throws IOException {
-    CarbonInputFormat<V> carbonInputFormat = new CarbonInputFormat<>();
+    CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>();
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
     return carbonInputFormat;
   }