You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/03/23 08:22:16 UTC

[3/5] incubator-carbondata git commit: Fix groupid, package name, Class name issues

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
deleted file mode 100755
index 692d69e..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import com.facebook.presto.carbondata.impl.CarbonLocalInputSplit;
-import com.facebook.presto.carbondata.impl.CarbonTableCacheModel;
-import com.facebook.presto.carbondata.impl.CarbonTableReader;
-import com.facebook.presto.spi.*;
-import com.facebook.presto.spi.connector.ConnectorSplitManager;
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-import com.facebook.presto.spi.predicate.Domain;
-import com.facebook.presto.spi.predicate.Range;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.facebook.presto.spi.type.*;
-import com.google.common.collect.ImmutableList;
-import io.airlift.slice.Slice;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.scan.expression.ColumnExpression;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.expression.LiteralExpression;
-import org.apache.carbondata.core.scan.expression.conditional.*;
-import org.apache.carbondata.core.scan.expression.logical.AndExpression;
-import org.apache.carbondata.core.scan.expression.logical.OrExpression;
-
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static com.facebook.presto.carbondata.Types.checkType;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-public class CarbondataSplitManager
-        implements ConnectorSplitManager
-{
-
-    private final String connectorId;
-    private final CarbonTableReader carbonTableReader;
-
-    @Inject
-    public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader)
-    {
-        this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
-        this.carbonTableReader = requireNonNull(reader, "client is null");
-    }
-
-    public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
-    {
-        CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle)layout;
-        CarbondataTableHandle tableHandle = layoutHandle.getTable();
-        SchemaTableName key = tableHandle.getSchemaTableName();
-
-        //get all filter domain
-        List<CarbondataColumnConstraint> rebuildConstraints = getColumnConstraints(layoutHandle.getConstraint());
-
-        CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
-        Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
-
-        if(cache != null) {
-            try {
-                List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
-
-                ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
-                for (CarbonLocalInputSplit split : splits) {
-                    cSplits.add(new CarbondataSplit(
-                            connectorId,
-                            tableHandle.getSchemaTableName(),
-                            layoutHandle.getConstraint(),
-                            split,
-                            rebuildConstraints
-                    ));
-                }
-                return new FixedSplitSource(cSplits.build());
-            } catch (Exception ex) {
-                System.out.println(ex.toString());
-            }
-        }
-        return null;
-    }
-
-
-    public List<CarbondataColumnConstraint> getColumnConstraints(TupleDomain<ColumnHandle> constraint)
-    {
-        ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
-        for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains().get()) {
-            CarbondataColumnHandle columnHandle = checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
-
-            constraintBuilder.add(new CarbondataColumnConstraint(
-                    columnHandle.getColumnName(),
-                    Optional.of(columnDomain.getDomain()),
-                    columnHandle.isInvertedIndex()));
-        }
-
-        return constraintBuilder.build();
-    }
-
-
-    public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable)
-    {
-        ImmutableList.Builder<Expression> filters = ImmutableList.builder();
-
-        Domain domain = null;
-
-        for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
-
-            CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
-            Type type = cdch.getColumnType();
-
-            List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
-            Optional<CarbonColumn>  target = ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
-
-            if(target.get() == null)
-                return null;
-
-            DataType coltype = target.get().getDataType();
-            ColumnExpression colExpression = new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
-            //colExpression.setColIndex(cs.getSchemaOrdinal());
-            colExpression.setDimension(target.get().isDimesion());
-            colExpression.setDimension(carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
-            colExpression.setCarbonColumn(target.get());
-
-            domain = originalConstraint.getDomains().get().get(c);
-            checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
-
-            if (domain.getValues().isNone()) {
-                //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
-                //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
-                //new Expression()
-            }
-
-            if (domain.getValues().isAll()) {
-                //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
-                //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
-            }
-
-            List<Object> singleValues = new ArrayList<>();
-            List<Expression> rangeFilter = new ArrayList<>();
-            for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
-                checkState(!range.isAll()); // Already checked
-                if (range.isSingleValue()) {
-                    singleValues.add(range.getLow().getValue());
-                }
-                else
-                {
-                    List<String> rangeConjuncts = new ArrayList<>();
-                    if (!range.getLow().isLowerUnbounded()) {
-                        Object value = ConvertDataByType(range.getLow().getValue(), type);
-                        switch (range.getLow().getBound()) {
-                            case ABOVE:
-                                if (type == TimestampType.TIMESTAMP) {
-                                    //todo not now
-                                } else {
-                                    GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
-                                    //greater.setRangeExpression(true);
-                                    rangeFilter.add(greater);
-                                }
-                                break;
-                            case EXACTLY:
-                                GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
-                                //greater.setRangeExpression(true);
-                                rangeFilter.add(greater);
-                                break;
-                            case BELOW:
-                                throw new IllegalArgumentException("Low marker should never use BELOW bound");
-                            default:
-                                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
-                        }
-                    }
-                    if (!range.getHigh().isUpperUnbounded()) {
-                        Object value = ConvertDataByType(range.getHigh().getValue(), type);
-                        switch (range.getHigh().getBound()) {
-                            case ABOVE:
-                                throw new IllegalArgumentException("High marker should never use ABOVE bound");
-                            case EXACTLY:
-                                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
-                                //less.setRangeExpression(true);
-                                rangeFilter.add(less);
-                                break;
-                            case BELOW:
-                                LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
-                                //less2.setRangeExpression(true);
-                                rangeFilter.add(less2);
-                                break;
-                            default:
-                                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
-                        }
-                    }
-                }
-            }
-
-            if (singleValues.size() == 1) {
-                Expression ex = null;
-                if (coltype.equals(DataType.STRING)) {
-                    ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
-                } else
-                    ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
-                filters.add(ex);
-            }
-            else if(singleValues.size() > 1) {
-                ListExpression candidates = null;
-                List<Expression> exs = singleValues.stream().map((a) ->
-                {
-                    return new LiteralExpression(ConvertDataByType(a, type), coltype);
-                }).collect(Collectors.toList());
-                candidates = new ListExpression(exs);
-
-                if(candidates != null)
-                    filters.add(new InExpression(colExpression, candidates));
-            }
-            else if(rangeFilter.size() > 0){
-                if(rangeFilter.size() > 1) {
-                    Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
-                    if(rangeFilter.size() > 2)
-                    {
-                        for(int i = 2; i< rangeFilter.size(); i++)
-                        {
-                            filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
-                        }
-                    }
-                }
-                else if(rangeFilter.size() == 1)//only have one value
-                    filters.add(rangeFilter.get(0));
-            }
-        }
-
-        Expression finalFilters;
-        List<Expression> tmp = filters.build();
-        if(tmp.size() > 1) {
-            finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
-            if(tmp.size() > 2)
-            {
-                for(int i = 2; i< tmp.size(); i++)
-                {
-                    finalFilters = new AndExpression(finalFilters, tmp.get(i));
-                }
-            }
-        }
-        else if(tmp.size() == 1)
-            finalFilters = tmp.get(0);
-        else//no filter
-            return null;
-
-        return finalFilters;
-    }
-
-    public static DataType Spi2CarbondataTypeMapper(Type colType)
-    {
-        if(colType == BooleanType.BOOLEAN)
-            return DataType.BOOLEAN;
-        else if(colType == SmallintType.SMALLINT)
-            return DataType.SHORT;
-        else if(colType == IntegerType.INTEGER)
-            return DataType.INT;
-        else if(colType == BigintType.BIGINT)
-            return DataType.LONG;
-        else if(colType == DoubleType.DOUBLE)
-            return DataType.DOUBLE;
-        else if(colType == DecimalType.createDecimalType())
-            return DataType.DECIMAL;
-        else if(colType == VarcharType.VARCHAR)
-            return DataType.STRING;
-        else if(colType == DateType.DATE)
-            return DataType.DATE;
-        else if(colType == TimestampType.TIMESTAMP)
-            return DataType.TIMESTAMP;
-        else
-            return DataType.STRING;
-    }
-
-
-    public Object ConvertDataByType(Object rawdata, Type type)
-    {
-        if(type.equals(IntegerType.INTEGER))
-            return new Integer((rawdata.toString()));
-        else if(type.equals(BigintType.BIGINT))
-            return (Long)rawdata;
-        else if(type.equals(VarcharType.VARCHAR))
-            return ((Slice)rawdata).toStringUtf8();
-        else if(type.equals(BooleanType.BOOLEAN))
-            return (Boolean)(rawdata);
-
-        return rawdata;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
deleted file mode 100755
index 6b263e0..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import com.facebook.presto.spi.ConnectorTableHandle;
-import com.facebook.presto.spi.SchemaTableName;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Joiner;
-
-import java.util.Objects;
-
-import static java.util.Locale.ENGLISH;
-import static java.util.Objects.requireNonNull;
-
-public class CarbondataTableHandle
-        implements ConnectorTableHandle {
-
-    private final String connectorId;
-    private final SchemaTableName schemaTableName;
-
-    @JsonCreator
-    public CarbondataTableHandle(
-            @JsonProperty("connectorId") String connectorId,
-            @JsonProperty("schemaTableName") SchemaTableName schemaTableName)
-    {
-        this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
-        this.schemaTableName = schemaTableName;
-    }
-
-    @JsonProperty
-    public String getConnectorId()
-    {
-        return connectorId;
-    }
-
-    @JsonProperty
-    public SchemaTableName getSchemaTableName()
-    {
-        return schemaTableName;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(connectorId, schemaTableName);
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-        if (this == obj) {
-            return true;
-        }
-        if ((obj == null) || (getClass() != obj.getClass())) {
-            return false;
-        }
-
-        CarbondataTableHandle other = (CarbondataTableHandle) obj;
-        return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName.equals(other.getSchemaTableName());
-    }
-
-    @Override
-    public String toString()
-    {
-        return Joiner.on(":").join(connectorId, schemaTableName.toString());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
deleted file mode 100755
index 01434bd..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ConnectorTableLayoutHandle;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static java.util.Objects.requireNonNull;
-
-public class CarbondataTableLayoutHandle
-        implements ConnectorTableLayoutHandle
-{
-    private final CarbondataTableHandle table;
-    private final TupleDomain<ColumnHandle> constraint;
-
-    @JsonCreator
-    public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
-                                       @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
-    {
-        this.table = requireNonNull(table, "table is null");
-        this.constraint = requireNonNull(constraint, "constraint is null");
-    }
-
-    @JsonProperty
-    public CarbondataTableHandle getTable()
-    {
-        return table;
-    }
-
-    @JsonProperty
-    public TupleDomain<ColumnHandle> getConstraint()
-    {
-        return constraint;
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-        if (this == obj) {
-            return true;
-        }
-
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-
-        CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
-        return Objects.equals(table, other.table)
-                && Objects.equals(constraint, other.constraint);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(table, constraint);
-    }
-
-    @Override
-    public String toString()
-    {
-        return toStringHelper(this)
-                .add("table", table)
-                .add("constraint", constraint)
-                .toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
deleted file mode 100755
index a643a33..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-
-public enum CarbondataTransactionHandle
-        implements ConnectorTransactionHandle
-{
-    INSTANCE
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
deleted file mode 100755
index 5212dad..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata;
-
-import java.util.Locale;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-public class Types {
-    private Types() {}
-
-    public static <A, B extends A> B checkType(A value, Class<B> target, String name)
-    {
-        requireNonNull(value, String.format(Locale.ENGLISH, "%s is null", name));
-        checkArgument(target.isInstance(value),
-                "%s must be of type %s, not %s",
-                name,
-                target.getName(),
-                value.getClass().getName());
-        return target.cast(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
deleted file mode 100755
index 6084022..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata.impl;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-public class CarbonLocalInputSplit {
-
-    private static final long serialVersionUID = 3520344046772190207L;
-    private String segmentId;
-    private String path;
-    private long start;
-    private long length;
-    private List<String> locations;
-    private short version;
-    /**
-     * Number of BlockLets in a block
-     */
-    private int numberOfBlocklets = 0;
-
-
-    @JsonProperty
-    public short getVersion(){
-        return version;
-    }
-
-    @JsonProperty
-    public List<String> getLocations() {
-        return locations;
-    }
-
-    @JsonProperty
-    public long getLength() {
-        return length;
-    }
-
-    @JsonProperty
-    public long getStart() {
-        return start;
-    }
-
-    @JsonProperty
-    public String getPath() {
-        return path;
-    }
-
-    @JsonProperty
-    public String getSegmentId() {
-        return segmentId;
-    }
-
-    @JsonProperty
-    public int getNumberOfBlocklets() {
-        return numberOfBlocklets;
-    }
-
-    @JsonCreator
-    public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
-                                 @JsonProperty("path") String path,
-                                 @JsonProperty("start") long start,
-                                 @JsonProperty("length") long length,
-                                 @JsonProperty("locations") List<String> locations,
-                                 @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
-                                 @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
-                                 @JsonProperty("version") short version) {
-        this.path = path;
-        this.start = start;
-        this.length = length;
-        this.segmentId = segmentId;
-        this.locations = locations;
-        this.numberOfBlocklets = numberOfBlocklets;
-        //this.tableBlockInfo = tableBlockInfo;
-        this.version = version;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
deleted file mode 100755
index d47f2a5..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata.impl;
-
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-public class CarbonTableCacheModel {
-
-    public CarbonTableIdentifier carbonTableIdentifier;
-    public CarbonTablePath carbonTablePath;
-
-    public TableInfo tableInfo;
-    public CarbonTable carbonTable;
-    public String[] segments;
-
-    public boolean isValid()
-    {
-        if(carbonTable != null
-                && carbonTablePath != null
-                && carbonTableIdentifier != null)
-            return true;
-        else
-            return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
deleted file mode 100755
index cd52b85..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.presto.carbondata.impl;
-
-import io.airlift.configuration.Config;
-
-import javax.validation.constraints.NotNull;
-
-public class CarbonTableConfig {
-    //read from config
-    private String dbPtah;
-    private String tablePath;
-    private String storePath;
-
-    @NotNull
-    public String getDbPtah() {
-        return dbPtah;
-    }
-
-    @Config("carbondata-store")
-    public CarbonTableConfig setDbPtah(String dbPtah) {
-        this.dbPtah = dbPtah;
-        return this;
-    }
-
-    @NotNull
-    public String getTablePath() {
-        return tablePath;
-    }
-
-    @Config("carbondata-store")
-    public CarbonTableConfig setTablePath(String tablePath) {
-        this.tablePath = tablePath;
-        return this;
-    }
-
-    @NotNull
-    public String getStorePath() {
-        return storePath;
-    }
-
-    @Config("carbondata-store")
-    public CarbonTableConfig setStorePath(String storePath) {
-        this.storePath = storePath;
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
deleted file mode 100755
index 40bb841..0000000
--- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
+++ /dev/null
@@ -1,736 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.facebook.presto.carbondata.impl;
-
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Inject;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.*;
-import org.apache.carbondata.core.datastore.block.*;
-import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
-import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.mutate.UpdateVO;
-import org.apache.carbondata.core.reader.ThriftReader;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.service.impl.PathFactory;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.hadoop.CacheClient;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.thrift.TBase;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static java.util.Objects.requireNonNull;
-
-public class CarbonTableReader {
-    //CarbonTableReader will be a facade of these utils
-    //[
-    // 1:CarbonMetadata,(logic table)
-    // 2:FileFactory, (physic table file)
-    // 3:CarbonCommonFactory, (offer some )
-    // 4:DictionaryFactory, (parse dictionary util)
-    //]
-
-    private CarbonTableConfig config;
-    private List<SchemaTableName> tableList;
-    private CarbonFile dbStore;
-    private FileFactory.FileType fileType;
-
-    private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;//as a cache for Carbon reader
-
-    @Inject
-    public CarbonTableReader(CarbonTableConfig config){
-        this.config = requireNonNull(config, "CarbonTableConfig is null");
-        this.cc = new ConcurrentHashMap<>();
-    }
-
-    public CarbonTableCacheModel getCarbonCache(SchemaTableName table){
-        if(!cc.containsKey(table))//for worker node to initalize carbon metastore
-        {
-            try(ThreadContextClassLoader ignored = new ThreadContextClassLoader(FileFactory.class.getClassLoader())) {
-                if(dbStore == null) {
-                    fileType = FileFactory.getFileType(config.getStorePath());
-                    try{
-                        dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
-                    }catch (Exception ex){
-                        throw new RuntimeException(ex);
-                    }
-                }
-            }
-            updateSchemaTables();
-            parseCarbonMetadata(table);
-        }
-
-        if(cc.containsKey(table))
-            return cc.get(table);
-        else
-            return null;//need to reload?*/
-    }
-
-    public List<String> getSchemaNames() {
-        return updateSchemaList();
-    }
-
-    //default PathFilter
-    private static final PathFilter DefaultFilter = new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-            return CarbonTablePath.isCarbonDataFile(path.getName());
-        }
-    };
-
-    public boolean updateDbStore(){
-        if(dbStore == null) {
-            fileType = FileFactory.getFileType(config.getStorePath());
-            try{
-                dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
-            }catch (Exception ex){
-                throw new RuntimeException(ex);
-            }
-        }
-        return true;
-    }
-
-    public List<String> updateSchemaList() {
-        updateDbStore();
-
-        if(dbStore != null){
-            List<String> scs = Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
-            return scs;
-        }
-        else
-            return ImmutableList.of();
-    }
-
-
-    public Set<String> getTableNames(String schema) {
-        requireNonNull(schema, "schema is null");
-        return updateTableList(schema);
-    }
-
-    public Set<String> updateTableList(String dbName){
-        List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName())).collect(Collectors.toList());
-        if(schema.size() > 0)
-        {
-            return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()).collect(Collectors.toSet());
-        }
-        else
-            return ImmutableSet.of();
-    }
-
-    public CarbonTable getTable(SchemaTableName schemaTableName) {
-        try {
-            updateSchemaTables();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        requireNonNull(schemaTableName, "schemaTableName is null");
-        CarbonTable table = loadTableMetadata(schemaTableName);
-
-        return table;
-    }
-
-
-    public void updateSchemaTables()
-    {
-        //update logic determine later
-        if(dbStore == null)
-        {
-            updateSchemaList();
-        }
-
-        tableList = new LinkedList<>();
-        for(CarbonFile db: dbStore.listFiles())
-        {
-            if(!db.getName().endsWith(".mdt")) {
-                for (CarbonFile table : db.listFiles()) {
-                    tableList.add(new SchemaTableName(db.getName(), table.getName()));
-                }
-            }
-        }
-    }
-
-    private CarbonTable loadTableMetadata(SchemaTableName schemaTableName)
-    {
-        for (SchemaTableName table : tableList) {
-            if (!table.equals(schemaTableName))
-                continue;
-
-            return parseCarbonMetadata(table);
-        }
-        return null;
-    }
-
-    /**
-     * parse carbon metadata into cc(CarbonTableReader cache)
-     **/
-    public CarbonTable parseCarbonMetadata(SchemaTableName table)
-    {
-        CarbonTable result = null;
-        try {
-            //\u8fd9\u4e2a\u5e94\u8be5\u653e\u5728StoreFactory
-            CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
-            if(cache.isValid())
-                return cache.carbonTable;
-
-            //Step1: get table meta path, load carbon table param
-            String storePath = config.getStorePath();
-            cache.carbonTableIdentifier = new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString());
-            cache.carbonTablePath = PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
-            cc.put(table, cache);
-
-            //Step2: check file existed? read schema file
-            ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
-                public TBase create() {
-                    return new org.apache.carbondata.format.TableInfo();
-                }
-            };
-            ThriftReader thriftReader =
-                    new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
-            thriftReader.open();
-            org.apache.carbondata.format.TableInfo tableInfo =
-                    (org.apache.carbondata.format.TableInfo) thriftReader.read();
-            thriftReader.close();
-
-            //Format Level\u7684TableInfo\uff0c \u9700\u8981\u8f6c\u6362\u6210Code Level\u7684TableInfo
-            SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-            TableInfo wrapperTableInfo = schemaConverter
-                    .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
-                            storePath);
-            wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
-            //\u52a0\u8f7d\u5230CarbonMetadata\u4ed3\u5e93
-            CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-
-            cache.tableInfo = wrapperTableInfo;
-            cache.carbonTable = CarbonMetadata.getInstance().getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
-            result = cache.carbonTable;
-        }catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-
-        return result;
-    }
-
-    public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, Expression filters) throws Exception {
-
-        //\u5904\u7406filter, \u4e0b\u63a8filter\uff0c\u5c06\u5e94\u7528\u5728Segment\u7684\u7d22\u5f15\u4e0a
-        FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
-
-        AbsoluteTableIdentifier absoluteTableIdentifier = tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
-        CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
-        List<String> invalidSegments = new ArrayList<>();
-        List<UpdateVO> invalidTimestampsList = new ArrayList<>();
-
-        // get all valid segments and set them into the configuration
-        SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier);
-        SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-        SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager.getValidAndInvalidSegments();
-
-        tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
-        if (segments.getValidSegments().size() == 0) {
-            return new ArrayList<>(0);
-        }
-
-        // remove entry in the segment index if there are invalid segments
-        invalidSegments.addAll(segments.getInvalidSegments());
-        for (String invalidSegmentId : invalidSegments) {
-            invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
-        }
-        if (invalidSegments.size() > 0) {
-            List<TableSegmentUniqueIdentifier> invalidSegmentsIds = new ArrayList<>(invalidSegments.size());
-            for(String segId: invalidSegments) {
-                invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
-            }
-            cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
-        }
-
-        // get filter for segment
-        CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
-        FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
-
-        List<CarbonLocalInputSplit> result = new ArrayList<>();
-        //for each segment fetch blocks matching filter in Driver BTree
-        for (String segmentNo : tableCacheModel.segments) {
-            try{
-                List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient, updateStatusManager);
-                for (DataRefNode dataRefNode : dataRefNodes) {
-                    BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
-                    TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
-
-                    if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), updateStatusManager)) {
-                        continue;
-                    }
-                    result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
-                            tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
-                            Arrays.asList(tableBlockInfo.getLocations()), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
-                            tableBlockInfo.getVersion().number()));
-                }
-            }catch (Exception ex){
-                throw new RuntimeException(ex);
-            }
-        }
-        cacheClient.close();
-        return result;
-    }
-
-    /**
-     * get data blocks of given segment
-     */
-    private List<DataRefNode> getDataBlocksOfSegment(FilterExpressionProcessor filterExpressionProcessor,
-                                                     AbsoluteTableIdentifier absoluteTableIdentifier,
-                                                     CarbonTablePath tablePath,
-                                                     FilterResolverIntf resolver,
-                                                     String segmentId,
-                                                     CacheClient cacheClient,
-                                                     SegmentUpdateStatusManager updateStatusManager) throws IOException {
-        //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
-        //QueryStatistic statistic = new QueryStatistic();
-
-        //\u8bfb\u53d6Segment \u5185\u90e8\u7684Index
-        Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
-                getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
-                        updateStatusManager);
-
-        List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
-
-        if (null != segmentIndexMap) {
-            // build result
-            for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
-                List<DataRefNode> filterredBlocks;
-                // if no filter is given get all blocks from Btree Index
-                if (null == resolver) {
-                    filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-                } else {
-                    //ignore filter
-                    //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-
-                    // apply filter and get matching blocks
-                    filterredBlocks = filterExpressionProcessor
-                            .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
-                                    absoluteTableIdentifier);
-                }
-                resultFilterredBlocks.addAll(filterredBlocks);
-            }
-        }
-        //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
-        //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
-        return resultFilterredBlocks;
-    }
-
-    private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
-                                    UpdateVO updateDetails) {
-        if (null != updateDetails.getLatestUpdateTimestamp()
-                && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
-                .getRefreshedTimeStamp()) {
-            return true;
-        }
-        return false;
-    }
-
-    private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
-                                                                                                AbsoluteTableIdentifier absoluteTableIdentifier,
-                                                                                                CarbonTablePath tablePath,
-                                                                                                String segmentId,
-                                                                                                CacheClient cacheClient,
-                                                                                                SegmentUpdateStatusManager updateStatusManager) throws IOException {
-        Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
-        SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
-        boolean isSegmentUpdated = false;
-        Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
-        TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
-                new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
-        segmentTaskIndexWrapper =
-                cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
-        UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
-        if (null != segmentTaskIndexWrapper) {
-            segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
-            if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
-                taskKeys = segmentIndexMap.keySet();
-                isSegmentUpdated = true;
-            }
-        }
-
-        // if segment tree is not loaded, load the segment tree
-        if (segmentIndexMap == null || isSegmentUpdated) {
-
-            List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
-            List<String> segs = new ArrayList<>();
-            segs.add(segmentId);
-
-            FileSystem fs = getFileStatusOfSegments(new String[]{segmentId}, tablePath, fileStatusList);
-            List<InputSplit> splits = getSplit(fileStatusList, fs);
-
-            List<FileSplit> carbonSplits = new ArrayList<>();
-            for (InputSplit inputSplit : splits) {
-                FileSplit fileSplit = (FileSplit) inputSplit;
-                String segId = CarbonTablePath.DataPathUtil.getSegmentId(fileSplit.getPath().toString());//\u8fd9\u91cc\u7684seperator\u5e94\u8be5\u600e\u4e48\u52a0\uff1f\uff1f
-                if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
-                    continue;
-                }
-                carbonSplits.add(fileSplit);
-            }
-
-            List<TableBlockInfo> tableBlockInfoList  = new ArrayList<>();
-            for (FileSplit inputSplit : carbonSplits) {
-                if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId)) {
-
-                    BlockletInfos blockletInfos = new BlockletInfos(0, 0, 0);//this level we do not need blocklet info!!!! Is this a trick?
-                    tableBlockInfoList.add(
-                            new TableBlockInfo(inputSplit.getPath().toString(),
-                                    inputSplit.getStart(),
-                                    segmentId,
-                                    inputSplit.getLocations(),
-                                    inputSplit.getLength(),
-                                    blockletInfos,
-                                    ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION),
-                                    null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//\u8fd9\u91cc\u7684null\u662f\u5426\u4f1a\u5f02\u5e38\uff1f
-                }
-            }
-
-            Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
-            segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
-            // get Btree blocks for given segment
-            tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
-            tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
-            segmentTaskIndexWrapper =
-                    cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
-            segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
-        }
-        return segmentIndexMap;
-    }
-
-    private boolean isValidBlockBasedOnUpdateDetails(
-            Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
-            UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
-        String taskID = null;
-        if (null != carbonInputSplit) {
-            if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
-                return false;
-            }
-
-            if (null == taskKeys) {
-                return true;
-            }
-
-            taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
-            String bucketNo =
-                    CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
-
-            SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
-                    new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
-
-            String blockTimestamp = carbonInputSplit.getPath().getName()
-                    .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
-                            carbonInputSplit.getPath().getName().lastIndexOf('.'));
-            if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
-                    && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
-                if (!taskKeys.contains(taskBucketHolder)) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-    private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)  throws IOException  {
-
-        Iterator split = fileStatusList.iterator();
-
-        List<InputSplit> splits = new ArrayList<>();
-
-        while (true)
-        {
-            while (true)
-            {
-                while(split.hasNext()) {
-                    FileStatus file = (FileStatus) split.next();
-                    Path path = file.getPath();
-                    long length = file.getLen();
-                    if (length != 0L) {
-                        BlockLocation[] blkLocations;
-                        if (file instanceof LocatedFileStatus) {
-                            blkLocations = ((LocatedFileStatus) file).getBlockLocations();
-                        } else {
-                            blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
-                        }
-
-                        if (this.isSplitable()) {
-                            long blockSize1 = file.getBlockSize();
-                            long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
-
-                            long bytesRemaining;
-                            int blkIndex;
-                            for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) {
-                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
-                                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
-                            }
-
-                            if (bytesRemaining != 0L) {
-                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
-                                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));
-                            }
-                        }
-                        else
-                        {
-                            splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, blkLocations[0].getHosts()));
-                        }
-                    }
-                    else {
-                        splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, new String[0]));
-                    }
-                }
-                return splits;
-            }
-        }
-
-    }
-
-    private String[] getValidPartitions() {
-        //TODO: has to Identify partitions by partition pruning
-        return new String[] { "0" };
-    }
-
-    private FileSystem getFileStatusOfSegments(String[] segmentsToConsider,
-                                               CarbonTablePath tablePath,
-                                               List<FileStatus> result) throws IOException {
-        String[] partitionsToConsider = getValidPartitions();
-        if (partitionsToConsider.length == 0) {
-            throw new IOException("No partitions/data found");
-        }
-
-        FileSystem fs = null;
-
-        //PathFilter inputFilter = getDataFileFilter(job);
-
-        // get tokens for all the required FileSystem for table path
-        /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
-                job.getConfiguration());*/
-
-        //get all data files of valid partitions and segments
-        for (int i = 0; i < partitionsToConsider.length; ++i) {
-            String partition = partitionsToConsider[i];
-
-            for (int j = 0; j < segmentsToConsider.length; ++j) {
-                String segmentId = segmentsToConsider[j];
-                Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
-
-                try{
-                    Configuration conf = new Configuration();
-                    fs = segmentPath.getFileSystem(conf);
-                    //fs.initialize(segmentPath.toUri(), conf);
-
-                    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
-                    while (iter.hasNext()) {
-                        LocatedFileStatus stat = iter.next();
-                        //if(stat.getPath().toString().contains("carbondata"))//\u53c2\u770bcarbondata\u7684carbonInputFilter\u7684\u5b9e\u73b0
-                        if (DefaultFilter.accept(stat.getPath()))
-                        {
-                            if (stat.isDirectory()) {
-                                addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
-                            } else {
-                                result.add(stat);
-                            }
-                        }
-                    }
-                }catch (Exception ex){
-                    System.out.println(ex.toString());
-                }
-            }
-        }
-        return fs;
-    }
-
-    protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException {
-        RemoteIterator iter = fs.listLocatedStatus(path);
-
-        while(iter.hasNext()) {
-            LocatedFileStatus stat = (LocatedFileStatus)iter.next();
-            if(inputFilter.accept(stat.getPath())) {
-                if(stat.isDirectory()) {
-                    this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
-                } else {
-                    result.add(stat);
-                }
-            }
-        }
-
-    }
-
-    /**
-     * get data blocks of given btree
-     */
-    private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
-        List<DataRefNode> blocks = new LinkedList<DataRefNode>();
-        SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
-
-        try {
-            IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
-            IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
-
-            // Add all blocks of btree into result
-            DataRefNodeFinder blockFinder =
-                    new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
-            DataRefNode startBlock =
-                    blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
-            DataRefNode endBlock =
-                    blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
-            while (startBlock != endBlock) {
-                blocks.add(startBlock);
-                startBlock = startBlock.getNextDataRefNode();
-            }
-            blocks.add(endBlock);
-
-        } catch (KeyGenException e) {
-            System.out.println("Could not generate start key" + e.getMessage());
-        }
-        return blocks;
-    }
-
-    private boolean isSplitable() {
-        try {
-            // Don't split the file if it is local file system
-            if(this.fileType == FileFactory.FileType.LOCAL)
-            {
-                return false;
-            }
-        } catch (Exception e) {
-            return true;
-        }
-        return true;
-    }
-
-    private long computeSplitSize(long blockSize, long minSize,
-                                  long maxSize) {
-        return Math.max(minSize, Math.min(maxSize, blockSize));
-    }
-
-    private FileSplit makeSplit(Path file, long start, long length,
-                                String[] hosts) {
-        return new FileSplit(file, start, length, hosts);
-    }
-
-    private int getBlockIndex(BlockLocation[] blkLocations,
-                              long offset) {
-        for (int i = 0 ; i < blkLocations.length; i++) {
-            // is the offset inside this block?
-            if ((blkLocations[i].getOffset() <= offset) &&
-                    (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
-                return i;
-            }
-        }
-        BlockLocation last = blkLocations[blkLocations.length -1];
-        long fileLength = last.getOffset() + last.getLength() -1;
-        throw new IllegalArgumentException("Offset " + offset +
-                " is outside of file (0.." +
-                fileLength + ")");
-    }
-
-
-    /**
-     * get total number of rows. for count(*)
-     *
-     * @throws IOException
-     * @throws IndexBuilderException
-     */
-    public long getRowCount() throws IOException, IndexBuilderException {
-        long rowCount = 0;
-        /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
-
-        // no of core to load the blocks in driver
-        //addSegmentsIfEmpty(job, absoluteTableIdentifier);
-        int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
-        try {
-            numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
-                    .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
-        } catch (NumberFormatException e) {
-            numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
-        }
-        // creating a thread pool
-        ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
-        List<Future<Map<String, AbstractIndex>>> loadedBlocks =
-                new ArrayList<Future<Map<String, AbstractIndex>>>();
-        //for each segment fetch blocks matching filter in Driver BTree
-        for (String segmentNo : this.segmentList) {
-            // submitting the task
-            loadedBlocks
-                    .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo)));
-        }
-        threadPool.shutdown();
-        try {
-            threadPool.awaitTermination(1, TimeUnit.HOURS);
-        } catch (InterruptedException e) {
-            throw new IndexBuilderException(e);
-        }
-        try {
-            // adding all the rows of the blocks to get the total row
-            // count
-            for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
-                for (AbstractIndex abstractIndex : block.get().values()) {
-                    rowCount += abstractIndex.getTotalNumberOfRows();
-                }
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IndexBuilderException(e);
-        }*/
-        return rowCount;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
new file mode 100755
index 0000000..f2f69d9
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.predicate.Domain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSetter;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnConstraint {
+    private final String name;
+    private final boolean invertedindexed;
+    private Optional<Domain> domain;
+
+    @JsonCreator
+    public CarbondataColumnConstraint(
+            @JsonProperty("name") String name,
+            @JsonProperty("domain") Optional<Domain> domain,
+            @JsonProperty("invertedindexed") boolean invertedindexed)
+    {
+        this.name = requireNonNull(name, "name is null");
+        this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
+        this.domain = requireNonNull(domain, "domain is null");
+    }
+
+    @JsonProperty
+    public boolean isInvertedindexed()
+    {
+        return invertedindexed;
+    }
+
+    @JsonProperty
+    public String getName()
+    {
+        return name;
+    }
+
+    @JsonProperty
+    public Optional<Domain> getDomain()
+    {
+        return domain;
+    }
+
+    @JsonSetter
+    public void setDomain(Optional<Domain> domain)
+    {
+        this.domain = domain;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(name, domain, invertedindexed);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj) {
+            return true;
+        }
+
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
+        return Objects.equals(this.name, other.name)
+                && Objects.equals(this.domain, other.domain)
+                && Objects.equals(this.invertedindexed, other.invertedindexed);
+    }
+
+    @Override
+    public String toString()
+    {
+        return toStringHelper(this)
+                .add("name", this.name)
+                .add("invertedindexed", this.invertedindexed)
+                .add("domain", this.domain)
+                .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
new file mode 100755
index 0000000..252556a
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnHandle
+    implements ColumnHandle
+{
+        private final String connectorId;
+        private final String columnName;
+
+    public boolean isInvertedIndex() {
+        return isInvertedIndex;
+    }
+
+    private final Type columnType;
+    private final int ordinalPosition;
+    private final int keyOrdinal;
+    private final int columnGroupOrdinal;
+
+    private final int columnGroupId;
+    private final String columnUniqueId;
+    private final boolean isInvertedIndex;
+
+    public boolean isMeasure() {
+        return isMeasure;
+    }
+
+    private final boolean isMeasure;
+
+    public int getKeyOrdinal() {
+        return keyOrdinal;
+    }
+
+    public int getColumnGroupOrdinal() {
+        return columnGroupOrdinal;
+    }
+
+    public int getColumnGroupId() {
+        return columnGroupId;
+    }
+
+    public String getColumnUniqueId() {
+        return columnUniqueId;
+    }
+    /* ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
+        IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1  from tablename)
+        The columnhandle of clm3 : has ordinalposition = 3
+     */
+
+    @JsonCreator
+    public CarbondataColumnHandle(
+            @JsonProperty("connectorId") String connectorId,
+            @JsonProperty("columnName") String columnName,
+            @JsonProperty("columnType") Type columnType,
+            @JsonProperty("ordinalPosition") int ordinalPosition,
+            @JsonProperty("keyOrdinal") int keyOrdinal,
+            @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
+            @JsonProperty("isMeasure") boolean isMeasure,
+            @JsonProperty("columnGroupId") int columnGroupId,
+            @JsonProperty("columnUniqueId") String columnUniqueId,
+            @JsonProperty("isInvertedIndex") boolean isInvertedIndex)
+    {
+        this.connectorId = requireNonNull(connectorId, "connectorId is null");
+        this.columnName = requireNonNull(columnName, "columnName is null");
+        this.columnType = requireNonNull(columnType, "columnType is null");
+
+        this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
+        this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
+        this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
+
+        this.isMeasure = isMeasure;
+        this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
+        this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
+        this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+    }
+
+    @JsonProperty
+    public String getConnectorId()
+    {
+        return connectorId;
+    }
+
+    @JsonProperty
+    public String getColumnName()
+    {
+        return columnName;
+    }
+
+    @JsonProperty
+    public Type getColumnType()
+    {
+        return columnType;
+    }
+
+    @JsonProperty
+    public int getOrdinalPosition()
+    {
+        return ordinalPosition;
+    }
+
+    public ColumnMetadata getColumnMetadata()
+    {
+        return new ColumnMetadata(columnName, columnType, null, false);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(connectorId, columnName);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj) {
+            return true;
+        }
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
+        return Objects.equals(this.connectorId, other.connectorId) &&
+                Objects.equals(this.columnName, other.columnName);
+    }
+
+    @Override
+    public String toString()
+    {
+        return toStringHelper(this)
+                .add("connectorId", connectorId)
+                .add("columnName", columnName)
+                .add("columnType", columnType)
+                .add("ordinalPosition", ordinalPosition)
+                .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
new file mode 100755
index 0000000..90b4944
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnector
+        implements Connector
+{
+
+    private static final Logger log = Logger.get(CarbondataConnector.class);
+
+    private final LifeCycleManager lifeCycleManager;
+    private final CarbondataMetadata metadata;
+    private final ConnectorSplitManager splitManager;
+    private final ConnectorRecordSetProvider recordSetProvider;
+    private final ClassLoader classLoader;
+
+
+    public CarbondataConnector(LifeCycleManager lifeCycleManager,
+                               CarbondataMetadata metadata,
+                               ConnectorSplitManager splitManager,
+                               ConnectorRecordSetProvider recordSetProvider,
+                               ClassLoader classLoader)
+    {
+        this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+        this.metadata = requireNonNull(metadata, "metadata is null");
+        this.splitManager = requireNonNull(splitManager, "splitManager is null");
+        this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+        this.classLoader = requireNonNull(classLoader, "classLoader is null");
+    }
+
+    @Override
+    public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) {
+        checkConnectorSupports(READ_COMMITTED, isolationLevel);
+        return CarbondataTransactionHandle.INSTANCE;
+    }
+
+    @Override
+    public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+        metadata.putClassLoader(classLoader);
+        return metadata;
+    }
+
+    @Override
+    public ConnectorSplitManager getSplitManager() {
+        return splitManager;
+    }
+
+    @Override
+    public ConnectorRecordSetProvider getRecordSetProvider()
+    {
+        return recordSetProvider;
+    }
+
+    @Override
+    public final void shutdown()
+    {
+        try {
+            lifeCycleManager.stop();
+        }
+        catch (Exception e) {
+            log.error(e, "Error shutting down connector");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
new file mode 100755
index 0000000..324699c
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnectorFactory
+        implements ConnectorFactory {
+
+    private final String name;
+    private final ClassLoader classLoader;
+
+    public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader){
+        this.name = connectorName;
+        this.classLoader = requireNonNull(classLoader, "classLoader is null");
+    }
+
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public ConnectorHandleResolver getHandleResolver() {
+        return new CarbondataHandleResolver();
+    }
+
+    @Override
+    public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) {
+        requireNonNull(config, "config is null");
+
+        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+            Bootstrap app = new Bootstrap(new JsonModule(), new CarbondataModule(connectorId, context.getTypeManager()));
+
+            Injector injector = app
+                    .strictConfig()
+                    .doNotInitializeLogging()
+                    .setRequiredConfigurationProperties(config)
+                    .initialize();
+
+            LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+            CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+            //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
+            ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+            ConnectorRecordSetProvider connectorRecordSet = injector.getInstance(ConnectorRecordSetProvider.class);
+            //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
+
+
+            //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+            //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
+            //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+            //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
+            //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+
+
+            return new CarbondataConnector(
+                    lifeCycleManager,
+                    metadata,
+                    new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
+                    connectorRecordSet,//new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
+                    classLoader
+                    //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+                    //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+                    //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+            );
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
new file mode 100755
index 0000000..5aa72f1
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.google.inject.Inject;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnectorId
+{
+    private final String id;
+
+    @Inject
+    public CarbondataConnectorId(String id)
+    {
+        this.id = requireNonNull(id, "id is null");
+    }
+
+    @Override
+    public String toString()
+    {
+        return id;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(id);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj) {
+            return true;
+        }
+
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
+    }
+}
\ No newline at end of file